Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are welcome.
Best, Guowei On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org> wrote: > Yes, that sounds good! I'll probably have some comments on the FLIP > about the names of generic parameters and the Javadoc but we can address > them later or during implementation. > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > globalCommit() but we can also do that as a later addition. > > So I think we're good to go to update the FLIP, do any last minute > changes and then vote. > > Best, > Aljoscha > > On 23.09.20 06:13, Guowei Ma wrote: > > Hi, all > > > > Thank everyone very much for your ideas and suggestions. I would try to > > summarize again the consensus :). Correct me if I am wrong or > misunderstand > > you. > > > > ## Consensus-1 > > > > 1. The motivation of the unified sink API is to decouple the sink > > implementation from the different runtime execution mode. > > 2. The initial scope of the unified sink API only covers the file system > > type, which supports the real transactions. The FLIP focuses more on the > > semantics the new sink api should support. > > 3. We prefer the first alternative API, which could give the framework a > > greater opportunity to optimize. > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > > 5. The FLIP could move the `Snapshot & Drain` section in order to be more > > focused. > > > > ## Consensus-2 > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > aspects. 1. The same sink implementation would work for both the batch > and > > stream execution mode. 2. In the long run we should give the sink > developer > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > 2. Because the batch execution mode does not have the normal checkpoint > the > > sink developer should not depend on it any more if we want a unified > sink. > > 3. We can benefit by providing an asynchronous `Writer` version. But > > because the unified sink is already very complicated, we don’t add this > in > > the first version. > > > > > > According to these consensus I would propose the first version of the new > > sink api as follows. What do you think? Any comments are welcome. > > > > /** > > * This interface lets the sink developer build a simple transactional > sink > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > * This sink topology includes one {@link Writer} + one {@link > Committer} + > > one {@link GlobalCommitter}. > > * The {@link Writer} is responsible for producing the committable. > > * The {@link Committer} is responsible for committing a single > > committables. > > * The {@link GlobalCommitter} is responsible for committing an > aggregated > > committable, which we called global committables. > > * > > * But both the {@link Committer} and the {@link GlobalCommitter} are > > optional. > > */ > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > initContext); > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > initContext, > > List<WriterS> states); > > > > Optional<Committer<CommT>> createCommitter(); > > > > Optional<GlobalCommitter<CommT, GCommT>> > createGlobalCommitter(); > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > Optional<SimpleVersionedSerializer<GCommT>> > > getGlobalCommittableSerializer(); > > } > > > > /** > > * The {@link GlobalCommitter} is responsible for committing an > aggregated > > committable, which we called global committables. > > */ > > interface GlobalCommitter<CommT, GCommT> { > > > > /** > > * This method is called when restoring from a failover. > > * @param globalCommittables the global committables that are > not > > committed in the previous session. > > * @return the global committables that should be committed > again > > in the current session. > > */ > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > globalCommittables); > > > > /** > > * Compute an aggregated committable from a collection of > > committables. > > * @param committables a collection of committables that are > needed > > to combine > > * @return an aggregated committable > > */ > > GCommT combine(List<CommT> committables); > > > > void commit(List<GCommT> globalCommittables); > > > > /** > > * There are no committables any more. > > */ > > void endOfInput(); > > } > > > > Best, > > Guowei > >