> I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


@Guowei Ma <guowei....@gmail.com>  It is undesirable to do the dedup check
in the `commit` call, because it happens for each checkpoint cycle. We only
need to do the de-dup check one time when restoring GlobalCommT list from
the checkpoint.


Can you clarify the purpose of `recoveredGlobalCommittables`? If it is to
let sink implementations know the recovered GlobalCommT list, it is
probably not a sufficient API. For the Iceberg sink, we can try to
implement the de-dup check  inside the `recoveredGlobalCommittables` method
and commit any uncommitted GlobalCommT items. But how do we handle the
commit failed?


One alternative is to allow sink implementations to override "Li
st<GlobalCommT> recoverGlobalCommittables()". Framework handles the
checkpoint/state, and sink implementations can further customize the
restored list with de-dup check and filtering. Recovered uncommitted
GlobalCommT list will be committed in the next cycle. It is the same
rollover strategy for commit failure handling that we have been discussing.


## topologies


Regarding the topology options, if we agree that there is no one size fit
for all, we can let sink implementations choose the best topology. Maybe
the framework can provide 2-3 pre-defined topology implementations to help
the sinks.




On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma <guowei....@gmail.com> wrote:

> I would like to summarize the file type sink in the thread and their
> possible topologies.  I also try to give pros and cons of every topology
> option. Correct me if I am wrong.
>
> ### FileSink
>
> Topology Option: TmpFileWriter + Committer.
>
> ### IceBerg Sink
>
> #### Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
> Pro:
> 1. Same group has some id.
> Cons:
> 1. May limit users’ optimization space;
> 2. The topology does not meet the Hive’s requirements.
>
> #### Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
> Pro:
> 1. User has the opportunity to optimize the implementation of idempotence
> Cons:
> 2. Make the GlobalCommit more complicated.
> 3. The topology does not meets the Hive’s requirements
>
> ### Topology Option3: DataFileWriter + AggWriter + Committer
>
> Pros:
> 1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s requirements.
> 2. Opportunity to optimize the implementation of idempotence
> 3. The topology meets the Hive’s requirements.(See flowing)
> Con:
> 1. It introduce a relative complex topologies
>
> ## HiveSink
>
> ### Topology Option1: `TmpFileWriter` + `Committer` + `GlobalCommitterV2`.
> Pro:
> 1. Could skip the cleanup problem at first.
> Con:
> 1. This style topology does not meet the CompactHiveSink requirements.
>
> ### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
> `Committer`
> Pros
> 1. Could skip the clean up problem at first.
> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> Cons
> 1. This style topology does not meet the CompactHiveSink requirements.
> 2. There are two general `Committers` in the topology. For Hive’s case
> there might be no problem. But there might be a problem in 1.12. For
> example where to execute the sub-topology following the `Committer` in
> batch execution mode for the general case. Because the topology is built
> from `Writer` and `Committer` we might put all the sub-topology in the
> OperatorCoordinator. But if the topology is too complicated it might be
> very complicated. See following.
>
> ### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
> Pro
> 1. There is only one general committer.
> Cons
> 1. It has to consider the cleanup problem. (In theory both the Option1 and
> Option2 need to cleanup)
> 2. This style topology does not meet the CompactHiveSink requirements.
> 3. Have to figure out how to make the current version compatible.
>
> ### CompactHiveSink/MergeHiveSink
>
> #### Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> + `MergeWriter` + `GlobalCommiterV2`
> Pro
> 1. Could skip the clean up problem at first.
> Cons
> 2. Where to execute the sub-topology following the `Committer`.
>
> #### Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> + `MergeWriter` + AggWriter + Committer
> Pros
> 1. Could skip the clean up problem at first
> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> Con
> 1. Where to execute the sub-topology following the `Committer`.
>
> ### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg)
> + Committer
> Pro
> 1. There is only one committer. It is very easy to support in the batch
> execution mode.
> Con
> 2. It has to consider the cleanup problem. (In theory both the Option1 and
> Option2 need to cleanup)
>
>
> ### Summary
>
> From above we could divide the sink topology into two parts:
> 1. Write topology.
> 2. And One committer
>
> So we could provide a unified sink API looks like the following:
>
> public interface Sink<CommT> {
>         List<Writer<?, ?>> getWriters();
>         Committer<CommT> createCommitter()
> }
>
> In the long run maybe we could give the user more powerful ability like
> this (Currently some transformation still belongs to runtime):
> Sink<CommT> {
>         Transformation<CommT> createWriteTopology();
>          CommitFunction<CommT> createCommitter();
> }
>
> Best,
> Guowei
>
>
> On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, Stevn
>> I want to make a clarification first, the following reply only considers
>> the Iceberge sink, but does not consider other sinks.  Before make decision
>> we should consider all the sink.I would try to summary all the sink
>> requirments in the next mail
>>
>>
>> >>  run global committer in jobmanager (e.g. like sink coordinator)
>>
>> I think it could be.
>>
>>
>> >> You meant GlobalCommit -> GlobalCommT, right?
>>
>> Yes. Thanks :)
>>
>>
>> >> Is this called when restored from checkpoint/savepoint?
>>
>> Yes.
>>
>>
>> >>Iceberg sink needs to do a dup check here on which GlobalCommT were
>> committed and which weren't. Should it return the filtered/de-duped list of
>> GlobalCommT?
>>
>>
>> I think Iceberg sink needs to do the dedup in the `commit` call. The
>> `recoveredGlobalCommittables` is just for restoring the ids.
>>
>>
>> >> Sink implementation can decide if it wants to commit immediately or
>> just leave
>>
>> I think only the frame knows *when* call the commit function.
>>
>>
>> >>should this be "commit(List<GlobalCommT>)"?
>>
>> It could be. thanks.
>>
>>
>> Best,
>> Guowei
>>
>>
>> On Sun, Sep 20, 2020 at 12:11 AM Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> > I prefer to let the developer produce id to dedupe. I think this gives
>>> the developer more opportunity to optimize.
>>>
>>> Thinking about it again, I totally agree with Guowei on this. We don't
>>> really need the framework to generate the unique id for Iceberg sink.
>>> De-dup logic is totally internal to Iceberg sink and should be isolated
>>> inside. My earlier question regarding "commitGlobally(List<GlobalCommT>)
>>> can be concurrent or not" also becomes irrelevant, as long as the framework
>>> handles the GlobalCommT list properly (even with concurrent calls).
>>>
>>> Here are the things where framework can help
>>>
>>>    1. run global committer in jobmanager (e.g. like sink coordinator)
>>>    2. help with checkpointing, bookkeeping, commit failure handling,
>>>    recovery
>>>
>>>
>>> @Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter
>>> interface, I have some clarifying questions.
>>>
>>> > void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
>>>
>>>    1. You meant GlobalCommit -> GlobalCommT, right?
>>>    2. Is this called when restored from checkpoint/savepoint?
>>>    3.  Iceberg sink needs to do a dup check here on which GlobalCommT
>>>    were committed and which weren't. Should it return the filtered/de-duped
>>>    list of GlobalCommT?
>>>    4. Sink implementation can decide if it wants to commit immediately
>>>    or just leave
>>>
>>> > void commit(GlobalCommit globalCommit);
>>>
>>> should this be "commit(List<GlobalCommT>)"?
>>>
>>> Thanks,
>>> Steven
>>>
>>>
>>> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi, all
>>>>
>>>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg
>>>> sink
>>>> >>checkpoints the unique id into state during snapshot. It also inserts
>>>> the
>>>> >>unique id into the Iceberg snapshot metadata during commit. When a job
>>>> >>restores the state after failure, it needs to know if the restored
>>>> >>transactions/commits were successful or not. It basically iterates
>>>> through
>>>> >>the list of table snapshots from Iceberg and matches the unique ids
>>>> with
>>>> >>what is stored in Iceberg snapshot metadata.
>>>>
>>>> Thanks Steven for these detailed explanations. It makes me know the
>>>> IceBerg
>>>> better. However, I prefer to let the developer produce id to dedupe. I
>>>> think this gives the developer more opportunity to optimize. You could
>>>> see
>>>> the following for more details. Please correct me if I misunderstand
>>>> you.
>>>>
>>>> >> 3. Whether the `Writer` supports async functionality or not.
>>>> Currently I
>>>> do
>>>> >> not know which sink could benefit from it. Maybe it is just my own
>>>> problem.
>>>>
>>>> >> Here, I don't really know. We can introduce an "isAvailable()" method
>>>> >> and mostly ignore it for now and sinks can just always return true.
>>>> Or,
>>>> >> as an alternative, we don't add the method now but can add it later
>>>> with
>>>> >> a default implementation. Either way, we will probably not take
>>>> >> advantage of the "isAvailable()" now because that would require more
>>>> >> runtime changes.
>>>>
>>>> From the @Pitor's explanation I could see the other benefit that might
>>>> be
>>>> gained in the future. For example decoupling the task number and the
>>>> thread
>>>> number. But I have to admit that introducing `isAvailable` might
>>>> introduce
>>>> some complications in the runtime. You could see my alternative API
>>>> option
>>>> in the following. I believe that we could support such an async sink
>>>> writer
>>>> very easily in the future. What do you think?
>>>>
>>>> >> Yes, this is still tricky. What is the current state, would the
>>>> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve
>>>> both
>>>> >> the Iceberg and Hive cases? I believe Hive is the most tricky one
>>>> here,
>>>> >> but if we introduce the "combine" method on GlobalCommit, that could
>>>> >> serve the same purpose as the "aggregation operation" on the
>>>> individual
>>>> >> files, and we could even execute that "combine" in a distributed way.
>>>> >>We assume that GlobalCommit is a Agg/Combiner?
>>>>
>>>> I would share what possible problems that I am seeing currently and the
>>>> alternative options.
>>>>
>>>> ## IceBerg Sink
>>>>
>>>> ### Concern about generating nonce by framework.
>>>>
>>>> If let the `GlobalCommitter` provide a random nonce for the
>>>> `IceBergSink` I
>>>> think that it might not be efficient.  Because even if there are a very
>>>> small number of committables in the state you still need to iterate all
>>>> the
>>>> iceberg snapshot files to check whether the committable is committed
>>>> already. Even if it is efficient for the IceBergSink it might not be the
>>>> case for other sinks.
>>>>
>>>> If the framework generates auto-increment nonce instead, it might still
>>>> not
>>>> be optimal for users. For example, users might want to use some
>>>> business id
>>>> so that after failover they could query whether the commit is successful
>>>> after failover.
>>>>
>>>> I think users could generate more efficient nonce such as an
>>>> auto-increment
>>>> one. Therefore, it seems to provide more optimization chances if we let
>>>> users to generate the nonce.
>>>>
>>>>
>>>> ### Alternative Option
>>>>
>>>> public interface GlobalCommit<CommT, GlobalCommT> {
>>>>         // provide some runtime context such as
>>>> attempt-id,job-id,task-id.
>>>>         void open(InitContext context);
>>>>
>>>>         // This GlobalCommit would aggregate the committable to a
>>>> GlobalCommit before doing the commit operation.
>>>>         GlobalCommT combine(List<Committable> commitables)
>>>>
>>>>         // This method would be called after committing all the
>>>> GlobalCommit producing in the previous session.
>>>>         void recoveredGlobalCommittables(List<GlobalCommit>
>>>> globalCommits)
>>>>
>>>>         // developer would guarantee the idempotency by himself
>>>>         void commit(GlobalCommit globalCommit);
>>>> }
>>>>
>>>> User could guarantee the idenpointecy himself in a more efficient or
>>>> application specific way. If the user wants the `GlobalCommit` to be
>>>> executed in a distributed way, the user could use the runtime
>>>> information
>>>> to generate the partial order id himself.(We could ignore the clean up
>>>> first)
>>>>
>>>> Currently the sink might be looks like following:
>>>>
>>>> Sink<IN, LC, LCO, GC> {
>>>>         Writer<IN, LC> createWriter();
>>>>         Optional<Committer<LC, LCO>> createCommitter();
>>>>         Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
>>>> }
>>>>
>>>> ## Hive
>>>>
>>>> The HiveSink needs to compute whether a directory is finished or not.
>>>> But
>>>> HiveSink can not use the above `combine` method to decide whether a
>>>> directory is finished or not.
>>>>
>>>> For example we assume that whether the directory is finished or not is
>>>> decided by the event time. There might be a topology that the source and
>>>> sink are forward. The event time might be different in different
>>>> instances
>>>> of the `writer`. So the GlobalCommit’s combine can not produce a
>>>> GlobalCommT when the snapshot happens.
>>>>
>>>> In addition to the above case we should also consider the unaligned
>>>> checkpoint. Because the watermark does not skip. So there might be the
>>>> same
>>>> problem in the unaligned checkpoint.
>>>>
>>>> ### Option1:
>>>>
>>>> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
>>>>         // provide some runtime context such as
>>>> attempt-id,job-id,task-id,
>>>> maybe the event time;provide the restore state
>>>>         void open(InitContext context, StateT state);
>>>>
>>>>         // This is for the HiveSink. When all the writer say that the
>>>> the
>>>> bucket is finished it would return a GlobalCommitT
>>>>         Optional<GlobalCommT> combine(Committable commitables)
>>>>
>>>>         // This is for IcebergSink. Producing a GlobalCommitT every
>>>> checkpoint.
>>>>         Optional<GlobalCommT> preCommit();
>>>>
>>>>         // Maybe we need the shareState? After we decide the directory
>>>> we
>>>> make more detailed consideration then. The id could be remembered here.
>>>>         StateT snapshotState();
>>>>
>>>>         // developer would guarantee the idempotency by himself
>>>>         void commit(GlobalCommit globalCommit);
>>>> }
>>>>
>>>> ### Option2
>>>>
>>>> Actually the `GlobalCommit` in the option1 mixes the `Writer` and
>>>> `Committer` together. So it is intuitive to decouple the two functions.
>>>> For
>>>> support the hive we could prove a sink look like following
>>>>
>>>> Sink<In, LC, LCO, LCG> {
>>>>         Writer<In, LC> createWriter();
>>>>         Optional<Committer<LC, LCO>> createCommitter(); // we need this
>>>> to
>>>> change name.
>>>>         Optional<Writer<LCO, LCG>> createGlobalAgg();
>>>>         Optional<Committer<LCG, void>> createGlobalCommitter();
>>>> }
>>>>
>>>> The pro of this method is that we use two basic concepts: `Committer`
>>>> and
>>>> `Writer` to build a HiveSink.
>>>>
>>>> ### CompactHiveSink / MergeHiveSink
>>>>
>>>> There are still other complicated cases, which are not satisfied by the
>>>> above option. Users often complain about writing out many small files,
>>>> which will affect file reading efficiency and the performance and
>>>> stability
>>>> of the distributed file system. CompactHiveSink/MergeHiveSink hopes to
>>>> merge all files generated by this job in a single Checkpoint.
>>>>
>>>> The CompactHiveSink/MergeHiveSink topology can simply describe this
>>>> topology as follows:
>>>>
>>>> CompactSubTopology -> GlobalAgg -> GobalCommitter.
>>>>
>>>> The CompactSubTopology would look like following:
>>>>
>>>> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter
>>>>
>>>> Maybe the topology could be simpler but please keep in mind I just want
>>>> to
>>>> show that there might be very complicated topology requirements for
>>>> users.
>>>>
>>>>
>>>> A possible alternative option would be let the user build the topology
>>>> himself. But considering we have two execution modes we could only use
>>>> `Writer` and `Committer` to build the sink topology.
>>>>
>>>> ### Build Topology Option
>>>>
>>>> Sink<IN, OUT> {
>>>>         Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
>>>> WriterBuidler
>>>>         Sink<In, Out> addCommitter(Committer<In, Out> committer); //
>>>> Maybe
>>>> we could make this return Void if we do not consider code reuse and
>>>> introduce the cleaner
>>>> }
>>>>
>>>> ## Summary
>>>> The requirements of sink might be different, maybe we could use two
>>>> basic
>>>> bricks(Writer/Committer) to let the user build their own sink topology.
>>>> What do you guys think?
>>>>
>>>> I know the name stuff might be trikky for now but I want to discuss
>>>> these
>>>> things after we get the consus on the direction first.
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com>
>>>> wrote:
>>>>
>>>> > Aljoscha,
>>>> >
>>>> > > Instead the sink would have to check for each set of committables
>>>> > seperately if they had already been committed. Do you think this is
>>>> > feasible?
>>>> >
>>>> > Yes, that is how it works in our internal implementation [1]. We
>>>> don't use
>>>> > checkpointId. We generate a manifest file (GlobalCommT) to bundle all
>>>> the
>>>> > data files that the committer received in one checkpoint cycle. Then
>>>> we
>>>> > generate a unique manifest id for by hashing the location of the
>>>> manifest
>>>> > file. The manifest ids are stored in Iceberg snapshot metadata. Upon
>>>> > restore, we check each of the restored manifest files against Iceberg
>>>> table
>>>> > snapshot metadata to determine if we should discard or keep the
>>>> restored
>>>> > manifest files. If a commit has multiple manifest files (e.g.
>>>> accumulated
>>>> > from previous failed commits), we store the comma-separated manifest
>>>> ids in
>>>> > Iceberg snapshot metadata.
>>>> >
>>>> > > During normal operation this set would be very small, it would
>>>> usually
>>>> > only be the committables for the last checkpoint. Only when there is
>>>> an
>>>> > outage would multiple sets of committables pile up.
>>>> >
>>>> > You are absolutely right here. Even if there are multiple sets of
>>>> > committables, it is usually the last a few or dozen of snapshots we
>>>> need to
>>>> > check. Even with our current inefficient implementation of traversing
>>>> all
>>>> > table snapshots (in the scale of thousands) from oldest to latest, it
>>>> only
>>>> > took avg 60 ms and max 800 ms. so it is really not a concern for
>>>> Iceberg.
>>>> >
>>>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
>>>> >
>>>> > Just to clarify on the terminology here. Assuming here the Committable
>>>> > meant the `GlobalCommT` (like ManifestFile in Iceberg) in
>>>> > previous discussions, right? `CommT` means the Iceberg DataFile from
>>>> writer
>>>> > to committer.
>>>> >
>>>> > This can work assuming we *don't have concurrent executions
>>>> > of commitGlobally* even with concurrent checkpoints. Here is the
>>>> scenario
>>>> > regarding failure recovery I want to avoid.
>>>> >
>>>> > Assuming checkpoints 1, 2, 3 all completed. Each checkpoint generates
>>>> a
>>>> > manifest file, manifest-1, 2, 3.
>>>> > timeline
>>>> >
>>>> ------------------------------------------------------------------------->
>>>> > now
>>>> > commitGlobally(manifest-1, nonce-1) started
>>>> >          commitGlobally(manifest-2, nonce-2) started
>>>> >                     commitGlobally(manifest-2, nonce-2) failed
>>>> >                             commitGlobally(manifest-2 and manifest-3,
>>>> > nonce-3) started
>>>> >                                     commitGlobally(manifest-1,
>>>> nonce-1)
>>>> > failed
>>>> >                                             commitGlobally(manifest-2
>>>> and
>>>> > manifest-3, nonce-3) succeeded
>>>> >
>>>> > Now the job failed and was restored from checkpoint 3, which contains
>>>> > manifest file 1,2,3. We found nonce-3 was committed when checking
>>>> Iceberg
>>>> > table snapshot metadata. But in this case we won't be able to
>>>> correctly
>>>> > determine which manifest files were committed or not.
>>>> >
>>>> > If it is possible to have concurrent executions of  commitGlobally,
>>>> the
>>>> > alternative is to generate the unique id/nonce per GlobalCommT. Then
>>>> we can
>>>> > check each individual GlobalCommT (ManifestFile) with Iceberg snapshot
>>>> > metadata.
>>>> >
>>>> > Thanks,
>>>> > Steven
>>>> >
>>>> > [1]
>>>> >
>>>> >
>>>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569
>>>> >
>>>> > On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <aljos...@apache.org
>>>> >
>>>> > wrote:
>>>> >
>>>> > > Steven,
>>>> > >
>>>> > > we were also wondering if it is a strict requirement that "later"
>>>> > > updates to Iceberg subsume earlier updates. In the current version,
>>>> you
>>>> > > only check whether checkpoint X made it to Iceberg and then discard
>>>> all
>>>> > > committable state from Flink state for checkpoints smaller X.
>>>> > >
>>>> > > If we go with a (somewhat random) nonce, this would not work.
>>>> Instead
>>>> > > the sink would have to check for each set of committables
>>>> seperately if
>>>> > > they had already been committed. Do you think this is feasible?
>>>> During
>>>> > > normal operation this set would be very small, it would usually
>>>> only be
>>>> > > the committables for the last checkpoint. Only when there is an
>>>> outage
>>>> > > would multiple sets of committables pile up.
>>>> > >
>>>> > > We were thinking to extend the GlobalCommitter interface to allow
>>>> it to
>>>> > > report success or failure and then let the framework retry. I think
>>>> this
>>>> > > is something that you would need for the Iceberg case. The signature
>>>> > > could be like this:
>>>> > >
>>>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
>>>> > >
>>>> > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE,
>>>> and
>>>> > > RETRY.
>>>> > >
>>>> > > Best,
>>>> > > Aljoscha
>>>> > >
>>>> >
>>>>
>>>

Reply via email to