Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments are
welcome.

Best,
Guowei


On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, that sounds good! I'll probably have some comments on the FLIP
> about the names of generic parameters and the Javadoc but we can address
> them later or during implementation.
>
> I also think that we probably need the FAIL,RETRY,SUCCESS result for
> globalCommit() but we can also do that as a later addition.
>
> So I think we're good to go to update the FLIP, do any last minute
> changes and then vote.
>
> Best,
> Aljoscha
>
> On 23.09.20 06:13, Guowei Ma wrote:
> > Hi, all
> >
> > Thank everyone very much for your ideas and suggestions. I would try to
> > summarize again the consensus :). Correct me if I am wrong or
> misunderstand
> > you.
> >
> > ## Consensus-1
> >
> > 1. The motivation of the unified sink API is to decouple the sink
> > implementation from the different runtime execution mode.
> > 2. The initial scope of the unified sink API only covers the file system
> > type, which supports the real transactions. The FLIP focuses more on the
> > semantics the new sink api should support.
> > 3. We prefer the first alternative API, which could give the framework a
> > greater opportunity to optimize.
> > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > 5. The FLIP could move the `Snapshot & Drain` section in order to be more
> > focused.
> >
> > ## Consensus-2
> >
> > 1. What should the “Unified Sink API” support/cover? It includes two
> > aspects. 1. The same sink implementation would work for both the batch
> and
> > stream execution mode. 2. In the long run we should give the sink
> developer
> > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > 2. Because the batch execution mode does not have the normal checkpoint
> the
> > sink developer should not depend on it any more if we want a unified
> sink.
> > 3. We can benefit by providing an asynchronous `Writer` version. But
> > because the unified sink is already very complicated, we don’t add this
> in
> > the first version.
> >
> >
> > According to these consensus I would propose the first version of the new
> > sink api as follows. What do you think? Any comments are welcome.
> >
> > /**
> >   * This interface lets the sink developer build a simple transactional
> sink
> > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >   * This sink topology includes one {@link Writer} + one {@link
> Committer} +
> > one {@link GlobalCommitter}.
> >   * The {@link Writer} is responsible for producing the committable.
> >   * The {@link Committer} is responsible for committing a single
> > committables.
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   *
> >   * But both the {@link Committer} and the {@link GlobalCommitter} are
> > optional.
> >   */
> > interface TSink<IN, CommT, GCommT, WriterS> {
> >
> >          Writer<IN, CommT, WriterS> createWriter(InitContext
> initContext);
> >
> >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> initContext,
> > List<WriterS> states);
> >
> >          Optional<Committer<CommT>> createCommitter();
> >
> >          Optional<GlobalCommitter<CommT, GCommT>>
> createGlobalCommitter();
> >
> >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> >
> >          Optional<SimpleVersionedSerializer<GCommT>>
> > getGlobalCommittableSerializer();
> > }
> >
> > /**
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   */
> > interface GlobalCommitter<CommT, GCommT> {
> >
> >          /**
> >           * This method is called when restoring from a failover.
> >           * @param globalCommittables the global committables that are
> not
> > committed in the previous session.
> >           * @return the global committables that should be committed
> again
> > in the current session.
> >           */
> >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > globalCommittables);
> >
> >          /**
> >           * Compute an aggregated committable from a collection of
> > committables.
> >           * @param committables a collection of committables that are
> needed
> > to combine
> >           * @return an aggregated committable
> >           */
> >          GCommT combine(List<CommT> committables);
> >
> >          void commit(List<GCommT> globalCommittables);
> >
> >          /**
> >           * There are no committables any more.
> >           */
> >          void endOfInput();
> > }
> >
> > Best,
> > Guowei
>
>

Reply via email to