Guowei, Thanks a lot for updating the wiki page. It looks great.
I noticed one inconsistency in the wiki with your last summary email for GlobalCommitter interface. I think the version in the summary email is the intended one, because rollover from previous failed commits can accumulate a list. CommitResult commit(GlobalCommT globalCommittable); // in the wiki => CommitResult commit(List<GlobalCommT> globalCommittable); // in the summary email I also have a clarifying question regarding the WriterStateT. Since IcebergWriter won't need to checkpoint any state, should we set it to *Void* type? Since getWriterStateSerializer() returns Optional, that is clear and we can return Optional.empty(). Thanks, Steven On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote: > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are > welcome. > > Best, > Guowei > > > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Yes, that sounds good! I'll probably have some comments on the FLIP > > about the names of generic parameters and the Javadoc but we can address > > them later or during implementation. > > > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > > globalCommit() but we can also do that as a later addition. > > > > So I think we're good to go to update the FLIP, do any last minute > > changes and then vote. > > > > Best, > > Aljoscha > > > > On 23.09.20 06:13, Guowei Ma wrote: > > > Hi, all > > > > > > Thank everyone very much for your ideas and suggestions. I would try to > > > summarize again the consensus :). Correct me if I am wrong or > > misunderstand > > > you. > > > > > > ## Consensus-1 > > > > > > 1. The motivation of the unified sink API is to decouple the sink > > > implementation from the different runtime execution mode. > > > 2. The initial scope of the unified sink API only covers the file > system > > > type, which supports the real transactions. The FLIP focuses more on > the > > > semantics the new sink api should support. > > > 3. We prefer the first alternative API, which could give the framework > a > > > greater opportunity to optimize. > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be > more > > > focused. > > > > > > ## Consensus-2 > > > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > > aspects. 1. The same sink implementation would work for both the batch > > and > > > stream execution mode. 2. In the long run we should give the sink > > developer > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > > 2. Because the batch execution mode does not have the normal checkpoint > > the > > > sink developer should not depend on it any more if we want a unified > > sink. > > > 3. We can benefit by providing an asynchronous `Writer` version. But > > > because the unified sink is already very complicated, we don’t add this > > in > > > the first version. > > > > > > > > > According to these consensus I would propose the first version of the > new > > > sink api as follows. What do you think? Any comments are welcome. > > > > > > /** > > > * This interface lets the sink developer build a simple transactional > > sink > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > > * This sink topology includes one {@link Writer} + one {@link > > Committer} + > > > one {@link GlobalCommitter}. > > > * The {@link Writer} is responsible for producing the committable. > > > * The {@link Committer} is responsible for committing a single > > > committables. > > > * The {@link GlobalCommitter} is responsible for committing an > > aggregated > > > committable, which we called global committables. > > > * > > > * But both the {@link Committer} and the {@link GlobalCommitter} are > > > optional. > > > */ > > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > > initContext); > > > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > > initContext, > > > List<WriterS> states); > > > > > > Optional<Committer<CommT>> createCommitter(); > > > > > > Optional<GlobalCommitter<CommT, GCommT>> > > createGlobalCommitter(); > > > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > > > Optional<SimpleVersionedSerializer<GCommT>> > > > getGlobalCommittableSerializer(); > > > } > > > > > > /** > > > * The {@link GlobalCommitter} is responsible for committing an > > aggregated > > > committable, which we called global committables. > > > */ > > > interface GlobalCommitter<CommT, GCommT> { > > > > > > /** > > > * This method is called when restoring from a failover. > > > * @param globalCommittables the global committables that are > > not > > > committed in the previous session. > > > * @return the global committables that should be committed > > again > > > in the current session. > > > */ > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > > globalCommittables); > > > > > > /** > > > * Compute an aggregated committable from a collection of > > > committables. > > > * @param committables a collection of committables that are > > needed > > > to combine > > > * @return an aggregated committable > > > */ > > > GCommT combine(List<CommT> committables); > > > > > > void commit(List<GCommT> globalCommittables); > > > > > > /** > > > * There are no committables any more. > > > */ > > > void endOfInput(); > > > } > > > > > > Best, > > > Guowei > > > > >