Guowei, thanks a lot for the summary. Here are a couple more questions that
need more clarification for the GlobalCommitter case.

* framework provides some sort of unique id per GlobalCommT (e.g. nonce or
some sort of transaction id)
* commit failure handling. Should we roll over to the next cycle? if so, we
may need commit(List<GlobalCommT> )

On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hey
>
> Thanks Dawid for bringing up my suggestion :)
>
> > I'm not so sure about this, the sinks I'm aware of would not be able to
> > implement this method: Kafka doesn't have this, I didn't see it in the
> > Iceberg interfaces, and HDFS/S3 also don't have it.
>
> Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we
> could do some magic. At the very least we could use
> `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when some
> threshold is exceeded. Alternatively, maybe we could hook in to the
> KafkaProducer's buffer state [1]:
>
> > The buffer.memory controls the total amount of memory available to the
> producer for buffering.
> > If records are sent faster than they can be transmitted to the server
> then this buffer space will be exhausted.
> > When the buffer space is exhausted additional send calls will block.
>
> As far as I can see, Kafka is exposing the `buffer-available-bytes`
> metric, which we might use instead of `pendingRecords`. Heck, we are
> already hacking KafkaProducer with reflections, we could access
> `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to
> call  `accumulator.bufferPoolAvailableMemory()` method, if metric would be
> to expensive to check per every record.
>
> Furthermore, I'm pretty sure other sinks (maybe not all) provide similar
> features. If we are desperate, we could always contribute something to
> those systems to make them expose the internal buffer's state.
>
> If we are really desperate, we could provide a generic records handover
> wrapper sink, that would have a buffer of N (5? 10? ) records and would be
> handing over those records to the blocking sink running in another thread.
> If the buffer is full, the sink would be unavailable.
>
> Guowei
> > Does the sink's snapshot return immediately when the sink's status is
> unavailable?
>
> State snapshot call is generally speaking non blocking already, so it
> should not be an issue. If it's blocking and if it will be solving some
> problem, we could later decide in the runtime code to not execute snapshot
> calls if a sink is unavailable. Think about isAvailable more like a hint
> from the operator to the runtime, which we can use to make better
> decisions. Also take a look at the FLIP-27 sources (`SourceReader`), where
> there already is `isAvailable()` method. It would be best if new sinks
> would just duplicate the same contract.
>
> > For me I want to know is what specific sink will benefit from this
> feature
>
> It's not the sinks that would benefit from this, but other parts of the
> system. Currently task thread is blocked on backpressured Sink, it's
> blocking some things from happening (checkpointing, closing, ...). If we
> make sinks non blocking (as is the network stack in the most part and as
> are the FLIP-27 sources), we will be able to snapshot state of the operator
> immediately. For example, change from blocking to non blocking sources was
> speeding up unaligned checkpoints from ~30seconds down to ~5seconds in
> our benchmarks, but the difference can be even more profound (hours instead
> of seconds/minutes as reported by some users).
>
> Piotrek
>
> [1]
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> śr., 16 wrz 2020 o 06:29 Guowei Ma <guowei....@gmail.com> napisał(a):
>
>> Hi,all
>>
>> Thanks for all your valuable options and ideas.Currently there are many
>> topics in the mail. I try to summarize what is consensus and what is not.
>> Correct me if I am wrong.
>>
>> ## Consensus
>>
>> 1. The motivation of the unified sink API is to decouple the sink
>> implementation from the different runtime execution mode.
>> 2. The initial scope of the unified sink API only covers the file system
>> type, which supports the real transactions. The FLIP focuses more on the
>> semantics the new sink api should support.
>> 3. We prefer the first alternative API, which could give the framework a
>> greater opportunity to optimize.
>> 4. The `Writer` needs to add a method `prepareCommit`, which would be
>> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
>> 5. The FLIP could move the `Snapshot & Drain` section in order to be more
>> focused.
>>
>> ## Not Consensus
>>
>> 1. What should the “Unified Sink API” support/cover? The API can
>> “unified”(decoupe) the commit operation in the term of supporting exactly
>> once semantics. However, even if we narrow down the initial supported
>> system to the file system there would be different topology requirements.
>> These requirements come from performance optimization
>> (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
>> “finished”).  Should the unified sink API support these requirements?
>> 2. The API does not expose the checkpoint-id because the batch execution
>> mode does not have the normal checkpoint. But there still some
>> implementations depend on this.(IceBergSink uses this to do some dedupe).
>> I think how to support this requirement depends on the first open
>> question.
>> 3. Whether the `Writer` supports async functionality or not. Currently I
>> do
>> not know which sink could benefit from it. Maybe it is just my own
>> problem.
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>> >
>> > Hi, Steven
>> > Thanks you for your thoughtful ideas and concerns.
>> >
>> > >>I still like the concept of grouping data files per checkpoint for
>> > streaming mode. it is cleaner and probably easier to manage and deal
>> with
>> > commit failures. Plus, it >>can reduce dupes for the at least once
>> > >>mode.  I understand checkpoint is not an option for batch execution.
>> We
>> > don't have to expose the checkpointId in API, as >>long as  the internal
>> > bookkeeping groups data files by checkpoints for streaming >>mode.
>> >
>> > I think this problem(How to dedupe the combined committed data) also
>> > depends on where to place the agg/combine logic .
>> >
>> > 1. If the agg/combine takes place in the “commit” maybe we need to
>> figure
>> > out how to give the aggregated committable a unique and auto-increment
>> id
>> > in the committer.
>> > 2. If the agg/combine takes place in a separate operator maybe sink
>> > developer could maintain the id itself by using the state.
>> >
>> > I think this problem is also decided by what the topology pattern the
>> sink
>> > API should support. Actually there are already many other topology
>> > requirements. :)
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com> wrote:
>> >
>> >> > AFAIK the committer would not see the file-1-2 when ck1 happens in
>> the
>> >> ExactlyOnce mode.
>> >>
>> >> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly
>> once
>> >> checkpoint semantics. what about "at least once"? I guess we can argue
>> that
>> >> it is fine to commit file-1-2 for at least once mode.
>> >>
>> >> I still like the concept of grouping data files per checkpoint for
>> >> streaming mode. it is cleaner and probably easier to manage and deal
>> with
>> >> commit failures. Plus, it can reduce dupes for the at least once
>> mode.  I
>> >> understand checkpoint is not an option for batch execution. We don't
>> have
>> >> to expose the checkpointId in API, as long as  the internal bookkeeping
>> >> groups data files by checkpoints for streaming mode.
>> >>
>> >>
>> >> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com>
>> wrote:
>> >>
>> >>> > images don't make it through to the mailing lists. You would need to
>> >>> host the file somewhere and send a link.
>> >>>
>> >>> Sorry about that. Here is the sample DAG in google drawings.
>> >>>
>> >>>
>> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
>> >>>
>> >>>
>> >>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com>
>> wrote:
>> >>>
>> >>>> Hi, Dawid
>> >>>>
>> >>>> >>I still find the merging case the most confusing. I don't
>> necessarily
>> >>>> understand why do you need the "SingleFileCommit" step in this
>> scenario.
>> >>>> The way I
>> >>>> >> understand "commit" operation is that it makes some data/artifacts
>> >>>> visible to the external system, thus it should be immutable from a
>> >>>> point of
>> >>>> view of a single >>process. Having an additional step in the same
>> >>>> process
>> >>>> that works on committed data contradicts with those assumptions. I
>> >>>> might be
>> >>>> missing something though. >> Could you elaborate >why can't it be
>> >>>> something
>> >>>> like FileWriter -> FileMergeWriter -> Committer (either global or
>> >>>> non-global)? Again it might be just me not getting the example.
>> >>>>
>> >>>> I think you are right. The topology
>> >>>> "FileWriter->FileMergeWriter->Committer" could meet the merge
>> >>>> requirement.
>> >>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
>> >>>> GlobalCommitter" reuses some code of the StreamingFileSink(For
>> example
>> >>>> rolling policy) so it has the "SingleFileCommitter" in the topology.
>> In
>> >>>> general I want to use the case to show that there are different
>> >>>> topologies
>> >>>> according to the requirements.
>> >>>>
>> >>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that
>> >>>> the
>> >>>> actual topology of merged supported HiveSink is more complicated than
>> >>>> that.
>> >>>>
>> >>>>
>> >>>> >> I've just briefly skimmed over the proposed interfaces. I would
>> >>>> suggest
>> >>>> one
>> >>>> >> addition to the Writer interface (as I understand this is the
>> runtime
>> >>>> >> interface in this proposal?): add some availability method, to
>> >>>> avoid, if
>> >>>> >> possible, blocking calls on the sink. We already have similar
>> >>>> >> availability methods in the new sources [1] and in various places
>> in
>> >>>> the
>> >>>> >> network stack [2].
>> >>>> >> BTW Let's not forget about Piotr's comment. I think we could add
>> the
>> >>>> isAvailable or similar method to the Writer interface in the FLIP.
>> >>>>
>> >>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your
>> reminder.
>> >>>> There
>> >>>> are two many issues at the same time.
>> >>>>
>> >>>> In addition to what Ajjoscha said : there is very little system
>> support
>> >>>> it.   Another thing I worry about is that: Does the sink's snapshot
>> >>>> return
>> >>>> immediately when the sink's status is unavailable? Maybe we could do
>> it
>> >>>> by
>> >>>> dedupe some element in the state but I think it might be too
>> >>>> complicated.
>> >>>> For me I want to know is what specific sink will benefit from this
>> >>>> feature.  @piotr <pi...@ververica.com>  Please correct me if  I
>> >>>> misunderstand you. thanks.
>> >>>>
>> >>>> Best,
>> >>>> Guowei
>> >>>>
>> >>>>
>> >>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <
>> >>>> dwysakow...@apache.org>
>> >>>> wrote:
>> >>>>
>> >>>> > What I understand is that HiveSink's implementation might need the
>> >>>> local
>> >>>> > committer(FileCommitter) because the file rename is needed.
>> >>>> > But the iceberg only needs to write the manifest file.  Would you
>> >>>> like to
>> >>>> > enlighten me why the Iceberg needs the local committer?
>> >>>> > Thanks
>> >>>> >
>> >>>> > Sorry if I caused a confusion here. I am not saying the Iceberg
>> sink
>> >>>> needs
>> >>>> > a local committer. What I had in mind is that prior to the Iceberg
>> >>>> example
>> >>>> > I did not see a need for a "GlobalCommitter" in the streaming
>> case. I
>> >>>> > thought it is always enough to have the "normal" committer in that
>> >>>> case.
>> >>>> > Now I understand that this differentiation is not really about
>> logical
>> >>>> > separation. It is not really about the granularity with which we
>> >>>> commit,
>> >>>> > i.e. answering the "WHAT" question. It is really about the
>> >>>> performance and
>> >>>> > that in the end we will have a single "transaction", so it is about
>> >>>> > answering the question "HOW".
>> >>>> >
>> >>>> >
>> >>>> >    -
>> >>>> >
>> >>>> >    Commit a directory with merged files(Some user want to merge the
>> >>>> files
>> >>>> >    in a directory before committing the directory to Hive meta
>> store)
>> >>>> >
>> >>>> >
>> >>>> >    1.
>> >>>> >
>> >>>> >    FileWriter -> SingleFileCommit -> FileMergeWriter  ->
>> >>>> GlobalCommitter
>> >>>> >
>> >>>> > I still find the merging case the most confusing. I don't
>> necessarily
>> >>>> > understand why do you need the "SingleFileCommit" step in this
>> >>>> scenario.
>> >>>> > The way I understand "commit" operation is that it makes some
>> >>>> > data/artifacts visible to the external system, thus it should be
>> >>>> immutable
>> >>>> > from a point of view of a single process. Having an additional step
>> >>>> in the
>> >>>> > same process that works on committed data contradicts with those
>> >>>> > assumptions. I might be missing something though. Could you
>> elaborate
>> >>>> why
>> >>>> > can't it be something like FileWriter -> FileMergeWriter ->
>> Committer
>> >>>> > (either global or non-global)? Again it might be just me not
>> getting
>> >>>> the
>> >>>> > example.
>> >>>> >
>> >>>> > I've just briefly skimmed over the proposed interfaces. I would
>> >>>> suggest one
>> >>>> > addition to the Writer interface (as I understand this is the
>> runtime
>> >>>> > interface in this proposal?): add some availability method, to
>> avoid,
>> >>>> if
>> >>>> > possible, blocking calls on the sink. We already have similar
>> >>>> > availability methods in the new sources [1] and in various places
>> in
>> >>>> the
>> >>>> > network stack [2].
>> >>>> >
>> >>>> > BTW Let's not forget about Piotr's comment. I think we could add
>> the
>> >>>> > isAvailable or similar method to the Writer interface in the FLIP.
>> >>>> >
>> >>>> > Best,
>> >>>> >
>> >>>> > Dawid
>> >>>> > On 15/09/2020 08:06, Guowei Ma wrote:
>> >>>> >
>> >>>> > I would think that we only need flush() and the semantics are that
>> it
>> >>>> > prepares for a commit, so on a physical level it would be called
>> from
>> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> >>>> > think flush() should be renamed to something like
>> "prepareCommit()".
>> >>>> >
>> >>>> > Generally speaking it is a good point that emitting the
>> committables
>> >>>> > should happen before emitting the checkpoint barrier downstream.
>> >>>> > However, if I remember offline discussions well, the idea behind
>> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit
>> on
>> >>>> > checkpoint vs final checkpoint at the end of the job. Both of these
>> >>>> > methods could emit committables, but the flush should not leave
>> any in
>> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
>> >>>> > snapshotState it could leave some open files that would be
>> committed
>> >>>> in
>> >>>> > a subsequent cycle, however flush should close all files). The
>> >>>> > snapshotState as it is now can not be called in
>> >>>> > prepareSnapshotPreBarrier as it can store some state, which should
>> >>>> > happen in Operator#snapshotState as otherwise it would always be
>> >>>> > synchronous. Therefore I think we would need sth like:
>> >>>> >
>> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>> >>>> >
>> >>>> > ver 1:
>> >>>> >
>> >>>> > List<StateT> snapshotState();
>> >>>> >
>> >>>> > ver 2:
>> >>>> >
>> >>>> > void snapshotState(); // not sure if we need that method at all in
>> >>>> option
>> >>>> >
>> >>>> > 2
>> >>>> >
>> >>>> > I second Dawid's proposal. This is a valid scenario. And version2
>> >>>> does not
>> >>>> > need the snapshotState() any more.
>> >>>> >
>> >>>> >
>> >>>> > The Committer is as described in the FLIP, it's basically a
>> function
>> >>>> > "void commit(Committable)". The GobalCommitter would be a function
>> >>>> "void
>> >>>> > commit(List<Committable>)". The former would be used by an S3 sink
>> >>>> where
>> >>>> > we can individually commit files to S3, a committable would be the
>> >>>> list
>> >>>> > of part uploads that will form the final file and the commit
>> operation
>> >>>> > creates the metadata in S3. The latter would be used by something
>> like
>> >>>> > Iceberg where the Committer needs a global view of all the commits
>> to
>> >>>> be
>> >>>> > efficient and not overwhelm the system.
>> >>>> >
>> >>>> > I don't know yet if sinks would only implement on type of commit
>> >>>> > function or potentially both at the same time, and maybe Commit can
>> >>>> > return some CommitResult that gets shipped to the GlobalCommit
>> >>>> function.
>> >>>> > I must admit it I did not get the need for Local/Normal + Global
>> >>>> > committer at first. The Iceberg example helped a lot. I think it
>> >>>> makes a
>> >>>> > lot of sense.
>> >>>> >
>> >>>> > @Dawid
>> >>>> > What I understand is that HiveSink's implementation might need the
>> >>>> local
>> >>>> > committer(FileCommitter) because the file rename is needed.
>> >>>> > But the iceberg only needs to write the manifest file.  Would you
>> >>>> like to
>> >>>> > enlighten me why the Iceberg needs the local committer?
>> >>>> > Thanks
>> >>>> >
>> >>>> > Best,
>> >>>> > Guowei
>> >>>> >
>> >>>> >
>> >>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
>> >>>> dwysakow...@apache.org> <dwysakow...@apache.org>
>> >>>> > wrote:
>> >>>> >
>> >>>> >
>> >>>> > Hi all,
>> >>>> >
>> >>>> >
>> >>>> > I would think that we only need flush() and the semantics are that
>> it
>> >>>> > prepares for a commit, so on a physical level it would be called
>> from
>> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> >>>> > think flush() should be renamed to something like
>> "prepareCommit()".
>> >>>> >
>> >>>> > Generally speaking it is a good point that emitting the
>> committables
>> >>>> > should happen before emitting the checkpoint barrier downstream.
>> >>>> > However, if I remember offline discussions well, the idea behind
>> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit
>> on
>> >>>> > checkpoint vs final checkpoint at the end of the job. Both of these
>> >>>> > methods could emit committables, but the flush should not leave
>> any in
>> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
>> >>>> > snapshotState it could leave some open files that would be
>> committed
>> >>>> in
>> >>>> > a subsequent cycle, however flush should close all files). The
>> >>>> > snapshotState as it is now can not be called in
>> >>>> > prepareSnapshotPreBarrier as it can store some state, which should
>> >>>> > happen in Operator#snapshotState as otherwise it would always be
>> >>>> > synchronous. Therefore I think we would need sth like:
>> >>>> >
>> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>> >>>> >
>> >>>> > ver 1:
>> >>>> >
>> >>>> > List<StateT> snapshotState();
>> >>>> >
>> >>>> > ver 2:
>> >>>> >
>> >>>> > void snapshotState(); // not sure if we need that method at all in
>> >>>> option 2
>> >>>> >
>> >>>> >
>> >>>> > The Committer is as described in the FLIP, it's basically a
>> function
>> >>>> > "void commit(Committable)". The GobalCommitter would be a function
>> >>>> "void
>> >>>> > commit(List<Committable>)". The former would be used by an S3 sink
>> >>>> where
>> >>>> > we can individually commit files to S3, a committable would be the
>> >>>> list
>> >>>> > of part uploads that will form the final file and the commit
>> operation
>> >>>> > creates the metadata in S3. The latter would be used by something
>> like
>> >>>> > Iceberg where the Committer needs a global view of all the commits
>> to
>> >>>> be
>> >>>> > efficient and not overwhelm the system.
>> >>>> >
>> >>>> > I don't know yet if sinks would only implement on type of commit
>> >>>> > function or potentially both at the same time, and maybe Commit can
>> >>>> > return some CommitResult that gets shipped to the GlobalCommit
>> >>>> function.
>> >>>> >
>> >>>> > I must admit it I did not get the need for Local/Normal + Global
>> >>>> > committer at first. The Iceberg example helped a lot. I think it
>> >>>> makes a
>> >>>> > lot of sense.
>> >>>> >
>> >>>> >
>> >>>> > For Iceberg, writers don't need any state. But the GlobalCommitter
>> >>>> > needs to
>> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a
>> >>>> single
>> >>>> > committer can collect thousands (or more) data files in one
>> checkpoint
>> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
>> (for
>> >>>> the
>> >>>> > collected thousands data files) as StateT. This allows us to absorb
>> >>>> > extended commit outages without losing written/uploaded data
>> files, as
>> >>>> > operator state size is as small as one manifest file per checkpoint
>> >>>> cycle
>> >>>> > [2].
>> >>>> > ------------------
>> >>>> > StateT snapshotState(SnapshotContext context) throws Exception;
>> >>>> >
>> >>>> > That means we also need the restoreCommitter API in the Sink
>> interface
>> >>>> > ---------------
>> >>>> > Committer<CommT, StateT> restoreCommitter(InitContext context,
>> StateT
>> >>>> > state);
>> >>>> >
>> >>>> > I think this might be a valid case. Not sure though if I would go
>> >>>> with a
>> >>>> > "state" there. Having a state in a committer would imply we need a
>> >>>> > collect method as well. So far we needed a single method
>> commit(...)
>> >>>> and
>> >>>> > the bookkeeping of the committables could be handled by the
>> >>>> framework. I
>> >>>> > think something like an optional combiner in the GlobalCommitter
>> would
>> >>>> > be enough. What do you think?
>> >>>> >
>> >>>> > GlobalCommitter<CommT, GlobalCommT> {
>> >>>> >
>> >>>> >     void commit(GlobalCommT globalCommittables);
>> >>>> >
>> >>>> >     GlobalCommT combine(List<CommT> committables);
>> >>>> >
>> >>>> > }
>> >>>> >
>> >>>> > A different problem that I see here is how do we handle commit
>> >>>> failures.
>> >>>> > Should the committables (both normal and global be included in the
>> >>>> next
>> >>>> > cycle, shall we retry it, ...) I think it would be worth laying it
>> out
>> >>>> > in the FLIP.
>> >>>> >
>> >>>> > @Aljoscha I think you can find the code Steven was referring in
>> here:
>> >>>> >
>> >>>>
>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>> >>>> >
>> >>>> > Best,
>> >>>> >
>> >>>> > Dawid
>> >>>> >
>> >>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
>> >>>> >
>> >>>> > On 14.09.20 01:23, Steven Wu wrote:
>> >>>> >
>> >>>> > ## Writer interface
>> >>>> >
>> >>>> > For the Writer interface, should we add "*prepareSnapshot"* before
>> the
>> >>>> > checkpoint barrier emitted downstream?  IcebergWriter would need
>> it.
>> >>>> Or
>> >>>> > would the framework call "*flush*" before the barrier emitted
>> >>>> > downstream?
>> >>>> > that guarantee would achieve the same goal.
>> >>>> >
>> >>>> > I would think that we only need flush() and the semantics are that
>> it
>> >>>> > prepares for a commit, so on a physical level it would be called
>> from
>> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> >>>> > think flush() should be renamed to something like
>> "prepareCommit()".
>> >>>> >
>> >>>> > @Guowei, what do you think about this?
>> >>>> >
>> >>>> >
>> >>>> > In [1], we discussed the reason for Writer to emit (checkpointId,
>> >>>> CommT)
>> >>>> > tuple to the committer. The committer needs checkpointId to
>> separate
>> >>>> out
>> >>>> > data files for different checkpoints if concurrent checkpoints are
>> >>>> > enabled.
>> >>>> >
>> >>>> > When can this happen? Even with concurrent checkpoints the snapshot
>> >>>> > barriers would still cleanly segregate the input stream of an
>> operator
>> >>>> > into tranches that should manifest in only one checkpoint. With
>> >>>> > concurrent checkpoints, all that can happen is that we start a
>> >>>> > checkpoint before a last one is confirmed completed.
>> >>>> >
>> >>>> > Unless there is some weirdness in the sources and some sources
>> start
>> >>>> > chk1 first and some other ones start chk2 first?
>> >>>> >
>> >>>> > @Piotrek, do you think this is a problem?
>> >>>> >
>> >>>> >
>> >>>> > For the Committer interface, I am wondering if we should split the
>> >>>> > single
>> >>>> > commit method into separate "*collect"* and "*commit"* methods?
>> This
>> >>>> > way,
>> >>>> > it can handle both single and multiple CommT objects.
>> >>>> >
>> >>>> > I think we can't do this. If the sink only needs a regular
>> Commiter,
>> >>>> > we can perform the commits in parallel, possibly on different
>> >>>> > machines. Only when the sink needs a GlobalCommitter would we need
>> to
>> >>>> > ship all commits to a single process and perform the commit there.
>> If
>> >>>> > both methods were unified in one interface we couldn't make the
>> >>>> > decision of were to commit in the framework code.
>> >>>> >
>> >>>> >
>> >>>> > For Iceberg, writers don't need any state. But the GlobalCommitter
>> >>>> > needs to
>> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a
>> >>>> > single
>> >>>> > committer can collect thousands (or more) data files in one
>> checkpoint
>> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
>> (for
>> >>>> the
>> >>>> > collected thousands data files) as StateT. This allows us to absorb
>> >>>> > extended commit outages without losing written/uploaded data
>> files, as
>> >>>> > operator state size is as small as one manifest file per checkpoint
>> >>>> > cycle
>> >>>> >
>> >>>> > You could have a point here. Is the code for this available in
>> >>>> > open-source? I was checking out
>> >>>> >
>> >>>> >
>> >>>> >
>> >>>>
>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
>> >>>> >
>> >>>> > and didn't find the ManifestFile optimization there.
>> >>>> >
>> >>>> > Best,
>> >>>> > Aljoscha
>> >>>> >
>> >>>> >
>> >>>> >
>> >>>>
>> >>>
>>
>

Reply via email to