> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
might
> be too complicated.

If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
to be retried. E.g. we have some outage with metadata service, commits for
checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't
want to commit them one by one. It is faster to commit the whole list as
one batch.

> 3. On the other hand, I think only when restoring IcebergSink needs a
> collection of `GlobalCommT` and giving back another collection of
> `GlobalCommT` that are not committed

That is when the job restarted due to failure or deployment.


On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, all
>
> From the above discussion we could find that FLIP focuses on providing an
> unified transactional sink API. So I updated the FLIP's title to "Unified
> Transactional Sink API". But I found that the old link could not be opened
> again.
>
> I would update the link[1] here. Sorry for the inconvenience.
>
> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Hi, Steven
> >
> > >>I also have a clarifying question regarding the WriterStateT. Since
> > >>IcebergWriter won't need to checkpoint any state, should we set it to
> > *Void*
> > >>type? Since getWriterStateSerializer() returns Optional, that is clear
> > and
> > >>we can return Optional.empty().
> >
> > Yes I think you could do it. If you return Optional.empty() we would
> > ignore all the state you return.
> >
> > Best,
> > Guowei
> >
> >
> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote:
> >
> >> Hi,Steven
> >>
> >> Thank you for reading the FLIP so carefully.
> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> >> 2. Of course we can let the `commit` return more detailed info but it
> >> might be too complicated.
> >> 3. On the other hand, I think only when restoring IcebergSink needs a
> >> collection of `GlobalCommT` and giving back another collection of
> >> `GlobalCommT` that are not committed.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote:
> >>
> >>> Guowei,
> >>>
> >>> Thanks a lot for updating the wiki page. It looks great.
> >>>
> >>> I noticed one inconsistency in the wiki with your last summary email
> for
> >>> GlobalCommitter interface. I think the version in the summary email is
> >>> the
> >>> intended one, because rollover from previous failed commits can
> >>> accumulate
> >>> a list.
> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> >>> =>
> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> >>> summary email
> >>>
> >>> I also have a clarifying question regarding the WriterStateT. Since
> >>> IcebergWriter won't need to checkpoint any state, should we set it to
> >>> *Void*
> >>> type? Since getWriterStateSerializer() returns Optional, that is clear
> >>> and
> >>> we can return Optional.empty().
> >>>
> >>> Thanks,
> >>> Steven
> >>>
> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> >>>
> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
> >>> comments are
> >>> > welcome.
> >>> >
> >>> > Best,
> >>> > Guowei
> >>> >
> >>> >
> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
> aljos...@apache.org>
> >>> > wrote:
> >>> >
> >>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
> >>> > > about the names of generic parameters and the Javadoc but we can
> >>> address
> >>> > > them later or during implementation.
> >>> > >
> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
> for
> >>> > > globalCommit() but we can also do that as a later addition.
> >>> > >
> >>> > > So I think we're good to go to update the FLIP, do any last minute
> >>> > > changes and then vote.
> >>> > >
> >>> > > Best,
> >>> > > Aljoscha
> >>> > >
> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
> >>> > > > Hi, all
> >>> > > >
> >>> > > > Thank everyone very much for your ideas and suggestions. I would
> >>> try to
> >>> > > > summarize again the consensus :). Correct me if I am wrong or
> >>> > > misunderstand
> >>> > > > you.
> >>> > > >
> >>> > > > ## Consensus-1
> >>> > > >
> >>> > > > 1. The motivation of the unified sink API is to decouple the sink
> >>> > > > implementation from the different runtime execution mode.
> >>> > > > 2. The initial scope of the unified sink API only covers the file
> >>> > system
> >>> > > > type, which supports the real transactions. The FLIP focuses more
> >>> on
> >>> > the
> >>> > > > semantics the new sink api should support.
> >>> > > > 3. We prefer the first alternative API, which could give the
> >>> framework
> >>> > a
> >>> > > > greater opportunity to optimize.
> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
> would
> >>> be
> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
> >>> method.
> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to
> >>> be
> >>> > more
> >>> > > > focused.
> >>> > > >
> >>> > > > ## Consensus-2
> >>> > > >
> >>> > > > 1. What should the “Unified Sink API” support/cover? It includes
> >>> two
> >>> > > > aspects. 1. The same sink implementation would work for both the
> >>> batch
> >>> > > and
> >>> > > > stream execution mode. 2. In the long run we should give the sink
> >>> > > developer
> >>> > > > the ability of building “arbitrary” topologies. But for
> Flink-1.12
> >>> we
> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
> sink.
> >>> > > > 2. Because the batch execution mode does not have the normal
> >>> checkpoint
> >>> > > the
> >>> > > > sink developer should not depend on it any more if we want a
> >>> unified
> >>> > > sink.
> >>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
> >>> But
> >>> > > > because the unified sink is already very complicated, we don’t
> add
> >>> this
> >>> > > in
> >>> > > > the first version.
> >>> > > >
> >>> > > >
> >>> > > > According to these consensus I would propose the first version of
> >>> the
> >>> > new
> >>> > > > sink api as follows. What do you think? Any comments are welcome.
> >>> > > >
> >>> > > > /**
> >>> > > >   * This interface lets the sink developer build a simple
> >>> transactional
> >>> > > sink
> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >>> > > >   * This sink topology includes one {@link Writer} + one {@link
> >>> > > Committer} +
> >>> > > > one {@link GlobalCommitter}.
> >>> > > >   * The {@link Writer} is responsible for producing the
> >>> committable.
> >>> > > >   * The {@link Committer} is responsible for committing a single
> >>> > > > committables.
> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
> >>> > > aggregated
> >>> > > > committable, which we called global committables.
> >>> > > >   *
> >>> > > >   * But both the {@link Committer} and the {@link
> GlobalCommitter}
> >>> are
> >>> > > > optional.
> >>> > > >   */
> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> >>> > > >
> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> >>> > > initContext);
> >>> > > >
> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> >>> > > initContext,
> >>> > > > List<WriterS> states);
> >>> > > >
> >>> > > >          Optional<Committer<CommT>> createCommitter();
> >>> > > >
> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> >>> > > createGlobalCommitter();
> >>> > > >
> >>> > > >          SimpleVersionedSerializer<CommT>
> >>> getCommittableSerializer();
> >>> > > >
> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> >>> > > > getGlobalCommittableSerializer();
> >>> > > > }
> >>> > > >
> >>> > > > /**
> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
> >>> > > aggregated
> >>> > > > committable, which we called global committables.
> >>> > > >   */
> >>> > > > interface GlobalCommitter<CommT, GCommT> {
> >>> > > >
> >>> > > >          /**
> >>> > > >           * This method is called when restoring from a failover.
> >>> > > >           * @param globalCommittables the global committables
> that
> >>> are
> >>> > > not
> >>> > > > committed in the previous session.
> >>> > > >           * @return the global committables that should be
> >>> committed
> >>> > > again
> >>> > > > in the current session.
> >>> > > >           */
> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> >>> > > > globalCommittables);
> >>> > > >
> >>> > > >          /**
> >>> > > >           * Compute an aggregated committable from a collection
> of
> >>> > > > committables.
> >>> > > >           * @param committables a collection of committables that
> >>> are
> >>> > > needed
> >>> > > > to combine
> >>> > > >           * @return an aggregated committable
> >>> > > >           */
> >>> > > >          GCommT combine(List<CommT> committables);
> >>> > > >
> >>> > > >          void commit(List<GCommT> globalCommittables);
> >>> > > >
> >>> > > >          /**
> >>> > > >           * There are no committables any more.
> >>> > > >           */
> >>> > > >          void endOfInput();
> >>> > > > }
> >>> > > >
> >>> > > > Best,
> >>> > > > Guowei
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to