Hi, all

>From the above discussion we could find that FLIP focuses on providing an
unified transactional sink API. So I updated the FLIP's title to "Unified
Transactional Sink API". But I found that the old link could not be opened
again.

I would update the link[1] here. Sorry for the inconvenience.

[1]https://cwiki.apache.org/confluence/x/KEJ4CQ

Best,
Guowei


On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Steven
>
> >>I also have a clarifying question regarding the WriterStateT. Since
> >>IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> >>type? Since getWriterStateSerializer() returns Optional, that is clear
> and
> >>we can return Optional.empty().
>
> Yes I think you could do it. If you return Optional.empty() we would
> ignore all the state you return.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi,Steven
>>
>> Thank you for reading the FLIP so carefully.
>> 1. The frame can not know which `GlobalCommT` to retry if we use the
>> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
>> 2. Of course we can let the `commit` return more detailed info but it
>> might be too complicated.
>> 3. On the other hand, I think only when restoring IcebergSink needs a
>> collection of `GlobalCommT` and giving back another collection of
>> `GlobalCommT` that are not committed.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> Guowei,
>>>
>>> Thanks a lot for updating the wiki page. It looks great.
>>>
>>> I noticed one inconsistency in the wiki with your last summary email for
>>> GlobalCommitter interface. I think the version in the summary email is
>>> the
>>> intended one, because rollover from previous failed commits can
>>> accumulate
>>> a list.
>>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>>> =>
>>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>>> summary email
>>>
>>> I also have a clarifying question regarding the WriterStateT. Since
>>> IcebergWriter won't need to checkpoint any state, should we set it to
>>> *Void*
>>> type? Since getWriterStateSerializer() returns Optional, that is clear
>>> and
>>> we can return Optional.empty().
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
>>> comments are
>>> > welcome.
>>> >
>>> > Best,
>>> > Guowei
>>> >
>>> >
>>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org>
>>> > wrote:
>>> >
>>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>>> > > about the names of generic parameters and the Javadoc but we can
>>> address
>>> > > them later or during implementation.
>>> > >
>>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>>> > > globalCommit() but we can also do that as a later addition.
>>> > >
>>> > > So I think we're good to go to update the FLIP, do any last minute
>>> > > changes and then vote.
>>> > >
>>> > > Best,
>>> > > Aljoscha
>>> > >
>>> > > On 23.09.20 06:13, Guowei Ma wrote:
>>> > > > Hi, all
>>> > > >
>>> > > > Thank everyone very much for your ideas and suggestions. I would
>>> try to
>>> > > > summarize again the consensus :). Correct me if I am wrong or
>>> > > misunderstand
>>> > > > you.
>>> > > >
>>> > > > ## Consensus-1
>>> > > >
>>> > > > 1. The motivation of the unified sink API is to decouple the sink
>>> > > > implementation from the different runtime execution mode.
>>> > > > 2. The initial scope of the unified sink API only covers the file
>>> > system
>>> > > > type, which supports the real transactions. The FLIP focuses more
>>> on
>>> > the
>>> > > > semantics the new sink api should support.
>>> > > > 3. We prefer the first alternative API, which could give the
>>> framework
>>> > a
>>> > > > greater opportunity to optimize.
>>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would
>>> be
>>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>>> method.
>>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to
>>> be
>>> > more
>>> > > > focused.
>>> > > >
>>> > > > ## Consensus-2
>>> > > >
>>> > > > 1. What should the “Unified Sink API” support/cover? It includes
>>> two
>>> > > > aspects. 1. The same sink implementation would work for both the
>>> batch
>>> > > and
>>> > > > stream execution mode. 2. In the long run we should give the sink
>>> > > developer
>>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12
>>> we
>>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
>>> > > > 2. Because the batch execution mode does not have the normal
>>> checkpoint
>>> > > the
>>> > > > sink developer should not depend on it any more if we want a
>>> unified
>>> > > sink.
>>> > > > 3. We can benefit by providing an asynchronous `Writer` version.
>>> But
>>> > > > because the unified sink is already very complicated, we don’t add
>>> this
>>> > > in
>>> > > > the first version.
>>> > > >
>>> > > >
>>> > > > According to these consensus I would propose the first version of
>>> the
>>> > new
>>> > > > sink api as follows. What do you think? Any comments are welcome.
>>> > > >
>>> > > > /**
>>> > > >   * This interface lets the sink developer build a simple
>>> transactional
>>> > > sink
>>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>>> > > >   * This sink topology includes one {@link Writer} + one {@link
>>> > > Committer} +
>>> > > > one {@link GlobalCommitter}.
>>> > > >   * The {@link Writer} is responsible for producing the
>>> committable.
>>> > > >   * The {@link Committer} is responsible for committing a single
>>> > > > committables.
>>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>>> > > aggregated
>>> > > > committable, which we called global committables.
>>> > > >   *
>>> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
>>> are
>>> > > > optional.
>>> > > >   */
>>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>>> > > >
>>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>>> > > initContext);
>>> > > >
>>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>>> > > initContext,
>>> > > > List<WriterS> states);
>>> > > >
>>> > > >          Optional<Committer<CommT>> createCommitter();
>>> > > >
>>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>>> > > createGlobalCommitter();
>>> > > >
>>> > > >          SimpleVersionedSerializer<CommT>
>>> getCommittableSerializer();
>>> > > >
>>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>>> > > > getGlobalCommittableSerializer();
>>> > > > }
>>> > > >
>>> > > > /**
>>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>>> > > aggregated
>>> > > > committable, which we called global committables.
>>> > > >   */
>>> > > > interface GlobalCommitter<CommT, GCommT> {
>>> > > >
>>> > > >          /**
>>> > > >           * This method is called when restoring from a failover.
>>> > > >           * @param globalCommittables the global committables that
>>> are
>>> > > not
>>> > > > committed in the previous session.
>>> > > >           * @return the global committables that should be
>>> committed
>>> > > again
>>> > > > in the current session.
>>> > > >           */
>>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>>> > > > globalCommittables);
>>> > > >
>>> > > >          /**
>>> > > >           * Compute an aggregated committable from a collection of
>>> > > > committables.
>>> > > >           * @param committables a collection of committables that
>>> are
>>> > > needed
>>> > > > to combine
>>> > > >           * @return an aggregated committable
>>> > > >           */
>>> > > >          GCommT combine(List<CommT> committables);
>>> > > >
>>> > > >          void commit(List<GCommT> globalCommittables);
>>> > > >
>>> > > >          /**
>>> > > >           * There are no committables any more.
>>> > > >           */
>>> > > >          void endOfInput();
>>> > > > }
>>> > > >
>>> > > > Best,
>>> > > > Guowei
>>> > >
>>> > >
>>> >
>>>
>>

Reply via email to