I should clarify my last email a little more.

For the example of commits for checkpoints 1-100 failed, the job is still
up (processing records and uploading files). When commit for checkpoint 101
came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
(100 old + 1 new) so that it can commit all of them in one transaction. it
is more efficient than 101 separate transactions.

Maybe the GlobalCommitter#commit semantics is to give the sink all
uncommitted GlobalCommT items and let sink implementation decide whether to
retry one by one or in a single transaction. It could mean that we need to
expand the CommitResult (e.g. a list for each result type, SUCCESS,
FAILURE, RETRY) interface. We can also start with the simple enum style
result for the whole list for now. If we need to break the experimental
API, it is also not a big deal since we only need to update a few sink
implementations.

Thanks,
Steven

On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <stevenz...@gmail.com> wrote:

> > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > 2. Of course we can let the `commit` return more detailed info but it
> might
> > be too complicated.
>
> If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
> to be retried. E.g. we have some outage with metadata service, commits for
> checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we don't
> want to commit them one by one. It is faster to commit the whole list as
> one batch.
>
> > 3. On the other hand, I think only when restoring IcebergSink needs a
> > collection of `GlobalCommT` and giving back another collection of
> > `GlobalCommT` that are not committed
>
> That is when the job restarted due to failure or deployment.
>
>
> On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, all
>>
>> From the above discussion we could find that FLIP focuses on providing an
>> unified transactional sink API. So I updated the FLIP's title to "Unified
>> Transactional Sink API". But I found that the old link could not be opened
>> again.
>>
>> I would update the link[1] here. Sorry for the inconvenience.
>>
>> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>> > Hi, Steven
>> >
>> > >>I also have a clarifying question regarding the WriterStateT. Since
>> > >>IcebergWriter won't need to checkpoint any state, should we set it to
>> > *Void*
>> > >>type? Since getWriterStateSerializer() returns Optional, that is clear
>> > and
>> > >>we can return Optional.empty().
>> >
>> > Yes I think you could do it. If you return Optional.empty() we would
>> > ignore all the state you return.
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote:
>> >
>> >> Hi,Steven
>> >>
>> >> Thank you for reading the FLIP so carefully.
>> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
>> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
>> >> 2. Of course we can let the `commit` return more detailed info but it
>> >> might be too complicated.
>> >> 3. On the other hand, I think only when restoring IcebergSink needs a
>> >> collection of `GlobalCommT` and giving back another collection of
>> >> `GlobalCommT` that are not committed.
>> >>
>> >> Best,
>> >> Guowei
>> >>
>> >>
>> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com>
>> wrote:
>> >>
>> >>> Guowei,
>> >>>
>> >>> Thanks a lot for updating the wiki page. It looks great.
>> >>>
>> >>> I noticed one inconsistency in the wiki with your last summary email
>> for
>> >>> GlobalCommitter interface. I think the version in the summary email is
>> >>> the
>> >>> intended one, because rollover from previous failed commits can
>> >>> accumulate
>> >>> a list.
>> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>> >>> =>
>> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>> >>> summary email
>> >>>
>> >>> I also have a clarifying question regarding the WriterStateT. Since
>> >>> IcebergWriter won't need to checkpoint any state, should we set it to
>> >>> *Void*
>> >>> type? Since getWriterStateSerializer() returns Optional, that is clear
>> >>> and
>> >>> we can return Optional.empty().
>> >>>
>> >>> Thanks,
>> >>> Steven
>> >>>
>> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com>
>> wrote:
>> >>>
>> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
>> >>> comments are
>> >>> > welcome.
>> >>> >
>> >>> > Best,
>> >>> > Guowei
>> >>> >
>> >>> >
>> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
>> aljos...@apache.org>
>> >>> > wrote:
>> >>> >
>> >>> > > Yes, that sounds good! I'll probably have some comments on the
>> FLIP
>> >>> > > about the names of generic parameters and the Javadoc but we can
>> >>> address
>> >>> > > them later or during implementation.
>> >>> > >
>> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
>> for
>> >>> > > globalCommit() but we can also do that as a later addition.
>> >>> > >
>> >>> > > So I think we're good to go to update the FLIP, do any last minute
>> >>> > > changes and then vote.
>> >>> > >
>> >>> > > Best,
>> >>> > > Aljoscha
>> >>> > >
>> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
>> >>> > > > Hi, all
>> >>> > > >
>> >>> > > > Thank everyone very much for your ideas and suggestions. I would
>> >>> try to
>> >>> > > > summarize again the consensus :). Correct me if I am wrong or
>> >>> > > misunderstand
>> >>> > > > you.
>> >>> > > >
>> >>> > > > ## Consensus-1
>> >>> > > >
>> >>> > > > 1. The motivation of the unified sink API is to decouple the
>> sink
>> >>> > > > implementation from the different runtime execution mode.
>> >>> > > > 2. The initial scope of the unified sink API only covers the
>> file
>> >>> > system
>> >>> > > > type, which supports the real transactions. The FLIP focuses
>> more
>> >>> on
>> >>> > the
>> >>> > > > semantics the new sink api should support.
>> >>> > > > 3. We prefer the first alternative API, which could give the
>> >>> framework
>> >>> > a
>> >>> > > > greater opportunity to optimize.
>> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
>> would
>> >>> be
>> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>> >>> method.
>> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order
>> to
>> >>> be
>> >>> > more
>> >>> > > > focused.
>> >>> > > >
>> >>> > > > ## Consensus-2
>> >>> > > >
>> >>> > > > 1. What should the “Unified Sink API” support/cover? It includes
>> >>> two
>> >>> > > > aspects. 1. The same sink implementation would work for both the
>> >>> batch
>> >>> > > and
>> >>> > > > stream execution mode. 2. In the long run we should give the
>> sink
>> >>> > > developer
>> >>> > > > the ability of building “arbitrary” topologies. But for
>> Flink-1.12
>> >>> we
>> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
>> sink.
>> >>> > > > 2. Because the batch execution mode does not have the normal
>> >>> checkpoint
>> >>> > > the
>> >>> > > > sink developer should not depend on it any more if we want a
>> >>> unified
>> >>> > > sink.
>> >>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
>> >>> But
>> >>> > > > because the unified sink is already very complicated, we don’t
>> add
>> >>> this
>> >>> > > in
>> >>> > > > the first version.
>> >>> > > >
>> >>> > > >
>> >>> > > > According to these consensus I would propose the first version
>> of
>> >>> the
>> >>> > new
>> >>> > > > sink api as follows. What do you think? Any comments are
>> welcome.
>> >>> > > >
>> >>> > > > /**
>> >>> > > >   * This interface lets the sink developer build a simple
>> >>> transactional
>> >>> > > sink
>> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>> >>> > > >   * This sink topology includes one {@link Writer} + one {@link
>> >>> > > Committer} +
>> >>> > > > one {@link GlobalCommitter}.
>> >>> > > >   * The {@link Writer} is responsible for producing the
>> >>> committable.
>> >>> > > >   * The {@link Committer} is responsible for committing a single
>> >>> > > > committables.
>> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> >>> > > aggregated
>> >>> > > > committable, which we called global committables.
>> >>> > > >   *
>> >>> > > >   * But both the {@link Committer} and the {@link
>> GlobalCommitter}
>> >>> are
>> >>> > > > optional.
>> >>> > > >   */
>> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>> >>> > > >
>> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>> >>> > > initContext);
>> >>> > > >
>> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>> >>> > > initContext,
>> >>> > > > List<WriterS> states);
>> >>> > > >
>> >>> > > >          Optional<Committer<CommT>> createCommitter();
>> >>> > > >
>> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>> >>> > > createGlobalCommitter();
>> >>> > > >
>> >>> > > >          SimpleVersionedSerializer<CommT>
>> >>> getCommittableSerializer();
>> >>> > > >
>> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>> >>> > > > getGlobalCommittableSerializer();
>> >>> > > > }
>> >>> > > >
>> >>> > > > /**
>> >>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> >>> > > aggregated
>> >>> > > > committable, which we called global committables.
>> >>> > > >   */
>> >>> > > > interface GlobalCommitter<CommT, GCommT> {
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * This method is called when restoring from a
>> failover.
>> >>> > > >           * @param globalCommittables the global committables
>> that
>> >>> are
>> >>> > > not
>> >>> > > > committed in the previous session.
>> >>> > > >           * @return the global committables that should be
>> >>> committed
>> >>> > > again
>> >>> > > > in the current session.
>> >>> > > >           */
>> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>> >>> > > > globalCommittables);
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * Compute an aggregated committable from a collection
>> of
>> >>> > > > committables.
>> >>> > > >           * @param committables a collection of committables
>> that
>> >>> are
>> >>> > > needed
>> >>> > > > to combine
>> >>> > > >           * @return an aggregated committable
>> >>> > > >           */
>> >>> > > >          GCommT combine(List<CommT> committables);
>> >>> > > >
>> >>> > > >          void commit(List<GCommT> globalCommittables);
>> >>> > > >
>> >>> > > >          /**
>> >>> > > >           * There are no committables any more.
>> >>> > > >           */
>> >>> > > >          void endOfInput();
>> >>> > > > }
>> >>> > > >
>> >>> > > > Best,
>> >>> > > > Guowei
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>>
>

Reply via email to