Guowei,

Thanks a lot for updating the wiki page. It looks great.

I noticed one inconsistency in the wiki with your last summary email for
GlobalCommitter interface. I think the version in the summary email is the
intended one, because rollover from previous failed commits can accumulate
a list.
CommitResult commit(GlobalCommT globalCommittable); // in the wiki
=>
CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
summary email

I also have a clarifying question regarding the WriterStateT. Since
IcebergWriter won't need to checkpoint any state, should we set it to *Void*
type? Since getWriterStateSerializer() returns Optional, that is clear and
we can return Optional.empty().

Thanks,
Steven

On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote:

> Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments are
> welcome.
>
> Best,
> Guowei
>
>
> On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Yes, that sounds good! I'll probably have some comments on the FLIP
> > about the names of generic parameters and the Javadoc but we can address
> > them later or during implementation.
> >
> > I also think that we probably need the FAIL,RETRY,SUCCESS result for
> > globalCommit() but we can also do that as a later addition.
> >
> > So I think we're good to go to update the FLIP, do any last minute
> > changes and then vote.
> >
> > Best,
> > Aljoscha
> >
> > On 23.09.20 06:13, Guowei Ma wrote:
> > > Hi, all
> > >
> > > Thank everyone very much for your ideas and suggestions. I would try to
> > > summarize again the consensus :). Correct me if I am wrong or
> > misunderstand
> > > you.
> > >
> > > ## Consensus-1
> > >
> > > 1. The motivation of the unified sink API is to decouple the sink
> > > implementation from the different runtime execution mode.
> > > 2. The initial scope of the unified sink API only covers the file
> system
> > > type, which supports the real transactions. The FLIP focuses more on
> the
> > > semantics the new sink api should support.
> > > 3. We prefer the first alternative API, which could give the framework
> a
> > > greater opportunity to optimize.
> > > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
> more
> > > focused.
> > >
> > > ## Consensus-2
> > >
> > > 1. What should the “Unified Sink API” support/cover? It includes two
> > > aspects. 1. The same sink implementation would work for both the batch
> > and
> > > stream execution mode. 2. In the long run we should give the sink
> > developer
> > > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > > 2. Because the batch execution mode does not have the normal checkpoint
> > the
> > > sink developer should not depend on it any more if we want a unified
> > sink.
> > > 3. We can benefit by providing an asynchronous `Writer` version. But
> > > because the unified sink is already very complicated, we don’t add this
> > in
> > > the first version.
> > >
> > >
> > > According to these consensus I would propose the first version of the
> new
> > > sink api as follows. What do you think? Any comments are welcome.
> > >
> > > /**
> > >   * This interface lets the sink developer build a simple transactional
> > sink
> > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> > >   * This sink topology includes one {@link Writer} + one {@link
> > Committer} +
> > > one {@link GlobalCommitter}.
> > >   * The {@link Writer} is responsible for producing the committable.
> > >   * The {@link Committer} is responsible for committing a single
> > > committables.
> > >   * The {@link GlobalCommitter} is responsible for committing an
> > aggregated
> > > committable, which we called global committables.
> > >   *
> > >   * But both the {@link Committer} and the {@link GlobalCommitter} are
> > > optional.
> > >   */
> > > interface TSink<IN, CommT, GCommT, WriterS> {
> > >
> > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> > initContext);
> > >
> > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> > initContext,
> > > List<WriterS> states);
> > >
> > >          Optional<Committer<CommT>> createCommitter();
> > >
> > >          Optional<GlobalCommitter<CommT, GCommT>>
> > createGlobalCommitter();
> > >
> > >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> > >
> > >          Optional<SimpleVersionedSerializer<GCommT>>
> > > getGlobalCommittableSerializer();
> > > }
> > >
> > > /**
> > >   * The {@link GlobalCommitter} is responsible for committing an
> > aggregated
> > > committable, which we called global committables.
> > >   */
> > > interface GlobalCommitter<CommT, GCommT> {
> > >
> > >          /**
> > >           * This method is called when restoring from a failover.
> > >           * @param globalCommittables the global committables that are
> > not
> > > committed in the previous session.
> > >           * @return the global committables that should be committed
> > again
> > > in the current session.
> > >           */
> > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > > globalCommittables);
> > >
> > >          /**
> > >           * Compute an aggregated committable from a collection of
> > > committables.
> > >           * @param committables a collection of committables that are
> > needed
> > > to combine
> > >           * @return an aggregated committable
> > >           */
> > >          GCommT combine(List<CommT> committables);
> > >
> > >          void commit(List<GCommT> globalCommittables);
> > >
> > >          /**
> > >           * There are no committables any more.
> > >           */
> > >          void endOfInput();
> > > }
> > >
> > > Best,
> > > Guowei
> >
> >
>

Reply via email to