Hi, all

Thank everyone very much for your ideas and suggestions. I would try to
summarize again the consensus :). Correct me if I am wrong or misunderstand
you.

## Consensus-1

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

## Consensus-2

1. What should the “Unified Sink API” support/cover? It includes two
aspects. 1. The same sink implementation would work for both the batch and
stream execution mode. 2. In the long run we should give the sink developer
the ability of building “arbitrary” topologies. But for Flink-1.12 we
should be more focused on only satisfying the S3/HDFS/Iceberg sink.
2. Because the batch execution mode does not have the normal checkpoint the
sink developer should not depend on it any more if we want a unified sink.
3. We can benefit by providing an asynchronous `Writer` version. But
because the unified sink is already very complicated, we don’t add this in
the first version.


According to these consensus I would propose the first version of the new
sink api as follows. What do you think? Any comments are welcome.

/**
 * This interface lets the sink developer build a simple transactional sink
topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} +
one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single
committables.
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 *
 * But both the {@link Committer} and the {@link GlobalCommitter} are
optional.
 */
interface TSink<IN, CommT, GCommT, WriterS> {

        Writer<IN, CommT, WriterS> createWriter(InitContext initContext);

        Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext,
List<WriterS> states);

        Optional<Committer<CommT>> createCommitter();

        Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();

        SimpleVersionedSerializer<CommT> getCommittableSerializer();

        Optional<SimpleVersionedSerializer<GCommT>>
getGlobalCommittableSerializer();
}

/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 */
interface GlobalCommitter<CommT, GCommT> {

        /**
         * This method is called when restoring from a failover.
         * @param globalCommittables the global committables that are not
committed in the previous session.
         * @return the global committables that should be committed again
in the current session.
         */
        List<GCommT> filterRecoveredCommittables(List<GCommT>
globalCommittables);

        /**
         * Compute an aggregated committable from a collection of
committables.
         * @param committables a collection of committables that are needed
to combine
         * @return an aggregated committable
         */
        GCommT combine(List<CommT> committables);

        void commit(List<GCommT> globalCommittables);

        /**
         * There are no committables any more.
         */
        void endOfInput();
}

Best,
Guowei


On Wed, Sep 23, 2020 at 12:03 PM Guowei Ma <guowei....@gmail.com> wrote:

> >> I think we should go with something like
>
> >> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>
> >> to keep things simple. This should also be easy to do from the framework
> >> side and then the sink doesn't need to do any custom state handling.
>
> I second Aljoscha's  proposal. For the first version there is already much
> stuff to do.
> For now I think it would be satisfied with IceBerg Sink.
>
> Best,
> Guowei
>
>
> On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> I think we should go with something like
>>
>> List<GlobalCommitT> filterRecoveredCommittables(List<>)
>>
>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.
>>
>> Best,
>> Aljoscha
>>
>> On 22.09.20 16:03, Steven Wu wrote:
>> > Previous APIs discussed have been trying to do more in the framework.
>> If we
>> > take a different approach to a lighter framework, these sets of
>> > minimal APIs are probably good enough. Sink can handle the bookkeeping,
>> > merge, retry logics.
>> >
>> > /**
>> >   * CommT is the DataFile in Iceberg
>> >   * GlobalCommT is the checkpoint data type, like ManifestFile in
>> Iceberg
>> > */
>> > interface GlobalCommitter<CommT, GlobalCommT> {
>> >
>> >    void collect(CommT);
>> >
>> >    void commit();
>> >
>> >    List<GlobalCommT> snapshotState();
>> >
>> >    // this is just a callback to sink so that it can perform filter and
>> > retain the uncommitted GlobalCommT in the internal bookkeeping
>> >    void recoveredCommittables(List<GlobalCommT>) ;
>> > }
>> >
>> > The most important need from the framework is to run GlobalCommitter in
>> the
>> > jobmanager. It involves the topology creation, checkpoint handling,
>> > serializing the executions of commit() calls etc.
>> >
>> > Thanks,
>> > Steven
>> >
>> > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu <stevenz...@gmail.com> wrote:
>> >
>> >> It is fine to leave the CommitResult/RETRY outside the scope of
>> framework.
>> >> Then the framework might need to provide some hooks in the
>> >> checkpoint/restore logic. because the commit happened in the post
>> >> checkpoint completion step, sink needs to update the internal state
>> when
>> >> the commit is successful so that the next checkpoint won't include the
>> >> committed GlobalCommT.
>> >>
>> >> Maybe GlobalCommitter can have an API like this?
>> >>> List<GlobalCommT> snapshotState();
>> >>
>> >> But then we still need the recover API if we don't let sink directly
>> >> manage the state.
>> >>> List<GlobalCommT> recoverCommittables(List<GlobalCommT>)
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek <aljos...@apache.org>
>> >> wrote:
>> >>
>> >>> On 22.09.20 13:26, Guowei Ma wrote:
>> >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is
>> not.
>> >>>> But for the initial version I hope we could make the sink api sync
>> >>> because
>> >>>> there is already a lot of stuff that has to finish. :--)
>> >>>
>> >>> I agree, for the first version we should stick to a simpler
>> synchronous
>> >>> interface.
>> >>>
>> >>> Aljoscha
>> >>>
>> >>
>> >
>>
>>

Reply via email to