Hi, Dawid

>>I still find the merging case the most confusing. I don't necessarily
understand why do you need the "SingleFileCommit" step in this scenario.
The way I
>> understand "commit" operation is that it makes some data/artifacts
visible to the external system, thus it should be immutable from a point of
view of a single >>process. Having an additional step in the same process
that works on committed data contradicts with those assumptions. I might be
missing something though. >> Could you elaborate >why can't it be something
like FileWriter -> FileMergeWriter -> Committer (either global or
non-global)? Again it might be just me not getting the example.

I think you are right. The topology
"FileWriter->FileMergeWriter->Committer" could meet the merge requirement.
The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
GlobalCommitter" reuses some code of the StreamingFileSink(For example
rolling policy) so it has the "SingleFileCommitter" in the topology. In
general I want to use the case to show that there are different topologies
according to the requirements.

BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the
actual topology of merged supported HiveSink is more complicated than that.


>> I've just briefly skimmed over the proposed interfaces. I would suggest
one
>> addition to the Writer interface (as I understand this is the runtime
>> interface in this proposal?): add some availability method, to avoid, if
>> possible, blocking calls on the sink. We already have similar
>> availability methods in the new sources [1] and in various places in the
>> network stack [2].
>> BTW Let's not forget about Piotr's comment. I think we could add the
isAvailable or similar method to the Writer interface in the FLIP.

Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your reminder. There
are two many issues at the same time.

In addition to what Ajjoscha said : there is very little system support
it.   Another thing I worry about is that: Does the sink's snapshot return
immediately when the sink's status is unavailable? Maybe we could do it by
dedupe some element in the state but I think it might be too complicated.
For me I want to know is what specific sink will benefit from this
feature.  @piotr <pi...@ververica.com>  Please correct me if  I
misunderstand you. thanks.

Best,
Guowei


On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> What I understand is that HiveSink's implementation might need the local
> committer(FileCommitter) because the file rename is needed.
> But the iceberg only needs to write the manifest file.  Would you like to
> enlighten me why the Iceberg needs the local committer?
> Thanks
>
> Sorry if I caused a confusion here. I am not saying the Iceberg sink needs
> a local committer. What I had in mind is that prior to the Iceberg example
> I did not see a need for a "GlobalCommitter" in the streaming case. I
> thought it is always enough to have the "normal" committer in that case.
> Now I understand that this differentiation is not really about logical
> separation. It is not really about the granularity with which we commit,
> i.e. answering the "WHAT" question. It is really about the performance and
> that in the end we will have a single "transaction", so it is about
> answering the question "HOW".
>
>
>    -
>
>    Commit a directory with merged files(Some user want to merge the files
>    in a directory before committing the directory to Hive meta store)
>
>
>    1.
>
>    FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
>
> I still find the merging case the most confusing. I don't necessarily
> understand why do you need the "SingleFileCommit" step in this scenario.
> The way I understand "commit" operation is that it makes some
> data/artifacts visible to the external system, thus it should be immutable
> from a point of view of a single process. Having an additional step in the
> same process that works on committed data contradicts with those
> assumptions. I might be missing something though. Could you elaborate why
> can't it be something like FileWriter -> FileMergeWriter -> Committer
> (either global or non-global)? Again it might be just me not getting the
> example.
>
> I've just briefly skimmed over the proposed interfaces. I would suggest one
> addition to the Writer interface (as I understand this is the runtime
> interface in this proposal?): add some availability method, to avoid, if
> possible, blocking calls on the sink. We already have similar
> availability methods in the new sources [1] and in various places in the
> network stack [2].
>
> BTW Let's not forget about Piotr's comment. I think we could add the
> isAvailable or similar method to the Writer interface in the FLIP.
>
> Best,
>
> Dawid
> On 15/09/2020 08:06, Guowei Ma wrote:
>
> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option
>
> 2
>
> I second Dawid's proposal. This is a valid scenario. And version2 does not
> need the snapshotState() any more.
>
>
> The Committer is as described in the FLIP, it's basically a function
> "void commit(Committable)". The GobalCommitter would be a function "void
> commit(List<Committable>)". The former would be used by an S3 sink where
> we can individually commit files to S3, a committable would be the list
> of part uploads that will form the final file and the commit operation
> creates the metadata in S3. The latter would be used by something like
> Iceberg where the Committer needs a global view of all the commits to be
> efficient and not overwhelm the system.
>
> I don't know yet if sinks would only implement on type of commit
> function or potentially both at the same time, and maybe Commit can
> return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> @Dawid
> What I understand is that HiveSink's implementation might need the local
> committer(FileCommitter) because the file rename is needed.
> But the iceberg only needs to write the manifest file.  Would you like to
> enlighten me why the Iceberg needs the local committer?
> Thanks
>
> Best,
> Guowei
>
>
> On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <dwysakow...@apache.org> 
> <dwysakow...@apache.org>
> wrote:
>
>
> Hi all,
>
>
> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput<CommT> output);
>
> ver 1:
>
> List<StateT> snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
>
> The Committer is as described in the FLIP, it's basically a function
> "void commit(Committable)". The GobalCommitter would be a function "void
> commit(List<Committable>)". The former would be used by an S3 sink where
> we can individually commit files to S3, a committable would be the list
> of part uploads that will form the final file and the commit operation
> creates the metadata in S3. The latter would be used by something like
> Iceberg where the Committer needs a global view of all the commits to be
> efficient and not overwhelm the system.
>
> I don't know yet if sinks would only implement on type of commit
> function or potentially both at the same time, and maybe Commit can
> return some CommitResult that gets shipped to the GlobalCommit function.
>
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
>
> For Iceberg, writers don't need any state. But the GlobalCommitter
> needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint cycle
> [2].
> ------------------
> StateT snapshotState(SnapshotContext context) throws Exception;
>
> That means we also need the restoreCommitter API in the Sink interface
> ---------------
> Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> state);
>
> I think this might be a valid case. Not sure though if I would go with a
> "state" there. Having a state in a committer would imply we need a
> collect method as well. So far we needed a single method commit(...) and
> the bookkeeping of the committables could be handled by the framework. I
> think something like an optional combiner in the GlobalCommitter would
> be enough. What do you think?
>
> GlobalCommitter<CommT, GlobalCommT> {
>
>     void commit(GlobalCommT globalCommittables);
>
>     GlobalCommT combine(List<CommT> committables);
>
> }
>
> A different problem that I see here is how do we handle commit failures.
> Should the committables (both normal and global be included in the next
> cycle, shall we retry it, ...) I think it would be worth laying it out
> in the FLIP.
>
> @Aljoscha I think you can find the code Steven was referring in here:
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>
> Best,
>
> Dawid
>
> On 14/09/2020 15:19, Aljoscha Krettek wrote:
>
> On 14.09.20 01:23, Steven Wu wrote:
>
> ## Writer interface
>
> For the Writer interface, should we add "*prepareSnapshot"* before the
> checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
> would the framework call "*flush*" before the barrier emitted
> downstream?
> that guarantee would achieve the same goal.
>
> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".
>
> @Guowei, what do you think about this?
>
>
> In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
> tuple to the committer. The committer needs checkpointId to separate out
> data files for different checkpoints if concurrent checkpoints are
> enabled.
>
> When can this happen? Even with concurrent checkpoints the snapshot
> barriers would still cleanly segregate the input stream of an operator
> into tranches that should manifest in only one checkpoint. With
> concurrent checkpoints, all that can happen is that we start a
> checkpoint before a last one is confirmed completed.
>
> Unless there is some weirdness in the sources and some sources start
> chk1 first and some other ones start chk2 first?
>
> @Piotrek, do you think this is a problem?
>
>
> For the Committer interface, I am wondering if we should split the
> single
> commit method into separate "*collect"* and "*commit"* methods? This
> way,
> it can handle both single and multiple CommT objects.
>
> I think we can't do this. If the sink only needs a regular Commiter,
> we can perform the commits in parallel, possibly on different
> machines. Only when the sink needs a GlobalCommitter would we need to
> ship all commits to a single process and perform the commit there. If
> both methods were unified in one interface we couldn't make the
> decision of were to commit in the framework code.
>
>
> For Iceberg, writers don't need any state. But the GlobalCommitter
> needs to
> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> single
> committer can collect thousands (or more) data files in one checkpoint
> cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> collected thousands data files) as StateT. This allows us to absorb
> extended commit outages without losing written/uploaded data files, as
> operator state size is as small as one manifest file per checkpoint
> cycle
>
> You could have a point here. Is the code for this available in
> open-source? I was checking out
>
>
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
>
> and didn't find the ManifestFile optimization there.
>
> Best,
> Aljoscha
>
>
>

Reply via email to