Hi, Dawid >>I still find the merging case the most confusing. I don't necessarily understand why do you need the "SingleFileCommit" step in this scenario. The way I >> understand "commit" operation is that it makes some data/artifacts visible to the external system, thus it should be immutable from a point of view of a single >>process. Having an additional step in the same process that works on committed data contradicts with those assumptions. I might be missing something though. >> Could you elaborate >why can't it be something like FileWriter -> FileMergeWriter -> Committer (either global or non-global)? Again it might be just me not getting the example.
I think you are right. The topology "FileWriter->FileMergeWriter->Committer" could meet the merge requirement. The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> GlobalCommitter" reuses some code of the StreamingFileSink(For example rolling policy) so it has the "SingleFileCommitter" in the topology. In general I want to use the case to show that there are different topologies according to the requirements. BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that the actual topology of merged supported HiveSink is more complicated than that. >> I've just briefly skimmed over the proposed interfaces. I would suggest one >> addition to the Writer interface (as I understand this is the runtime >> interface in this proposal?): add some availability method, to avoid, if >> possible, blocking calls on the sink. We already have similar >> availability methods in the new sources [1] and in various places in the >> network stack [2]. >> BTW Let's not forget about Piotr's comment. I think we could add the isAvailable or similar method to the Writer interface in the FLIP. Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your reminder. There are two many issues at the same time. In addition to what Ajjoscha said : there is very little system support it. Another thing I worry about is that: Does the sink's snapshot return immediately when the sink's status is unavailable? Maybe we could do it by dedupe some element in the state but I think it might be too complicated. For me I want to know is what specific sink will benefit from this feature. @piotr <pi...@ververica.com> Please correct me if I misunderstand you. thanks. Best, Guowei On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > What I understand is that HiveSink's implementation might need the local > committer(FileCommitter) because the file rename is needed. > But the iceberg only needs to write the manifest file. Would you like to > enlighten me why the Iceberg needs the local committer? > Thanks > > Sorry if I caused a confusion here. I am not saying the Iceberg sink needs > a local committer. What I had in mind is that prior to the Iceberg example > I did not see a need for a "GlobalCommitter" in the streaming case. I > thought it is always enough to have the "normal" committer in that case. > Now I understand that this differentiation is not really about logical > separation. It is not really about the granularity with which we commit, > i.e. answering the "WHAT" question. It is really about the performance and > that in the end we will have a single "transaction", so it is about > answering the question "HOW". > > > - > > Commit a directory with merged files(Some user want to merge the files > in a directory before committing the directory to Hive meta store) > > > 1. > > FileWriter -> SingleFileCommit -> FileMergeWriter -> GlobalCommitter > > I still find the merging case the most confusing. I don't necessarily > understand why do you need the "SingleFileCommit" step in this scenario. > The way I understand "commit" operation is that it makes some > data/artifacts visible to the external system, thus it should be immutable > from a point of view of a single process. Having an additional step in the > same process that works on committed data contradicts with those > assumptions. I might be missing something though. Could you elaborate why > can't it be something like FileWriter -> FileMergeWriter -> Committer > (either global or non-global)? Again it might be just me not getting the > example. > > I've just briefly skimmed over the proposed interfaces. I would suggest one > addition to the Writer interface (as I understand this is the runtime > interface in this proposal?): add some availability method, to avoid, if > possible, blocking calls on the sink. We already have similar > availability methods in the new sources [1] and in various places in the > network stack [2]. > > BTW Let's not forget about Piotr's comment. I think we could add the > isAvailable or similar method to the Writer interface in the FLIP. > > Best, > > Dawid > On 15/09/2020 08:06, Guowei Ma wrote: > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option > > 2 > > I second Dawid's proposal. This is a valid scenario. And version2 does not > need the snapshotState() any more. > > > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > @Dawid > What I understand is that HiveSink's implementation might need the local > committer(FileCommitter) because the file rename is needed. > But the iceberg only needs to write the manifest file. Would you like to > enlighten me why the Iceberg needs the local committer? > Thanks > > Best, > Guowei > > > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > wrote: > > > Hi all, > > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > Generally speaking it is a good point that emitting the committables > should happen before emitting the checkpoint barrier downstream. > However, if I remember offline discussions well, the idea behind > Writer#flush and Writer#snapshotState was to differentiate commit on > checkpoint vs final checkpoint at the end of the job. Both of these > methods could emit committables, but the flush should not leave any in > progress state (e.g. in case of file sink in STREAM mode, in > snapshotState it could leave some open files that would be committed in > a subsequent cycle, however flush should close all files). The > snapshotState as it is now can not be called in > prepareSnapshotPreBarrier as it can store some state, which should > happen in Operator#snapshotState as otherwise it would always be > synchronous. Therefore I think we would need sth like: > > void prepareCommit(boolean flush, WriterOutput<CommT> output); > > ver 1: > > List<StateT> snapshotState(); > > ver 2: > > void snapshotState(); // not sure if we need that method at all in option 2 > > > The Committer is as described in the FLIP, it's basically a function > "void commit(Committable)". The GobalCommitter would be a function "void > commit(List<Committable>)". The former would be used by an S3 sink where > we can individually commit files to S3, a committable would be the list > of part uploads that will form the final file and the commit operation > creates the metadata in S3. The latter would be used by something like > Iceberg where the Committer needs a global view of all the commits to be > efficient and not overwhelm the system. > > I don't know yet if sinks would only implement on type of commit > function or potentially both at the same time, and maybe Commit can > return some CommitResult that gets shipped to the GlobalCommit function. > > I must admit it I did not get the need for Local/Normal + Global > committer at first. The Iceberg example helped a lot. I think it makes a > lot of sense. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint cycle > [2]. > ------------------ > StateT snapshotState(SnapshotContext context) throws Exception; > > That means we also need the restoreCommitter API in the Sink interface > --------------- > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT > state); > > I think this might be a valid case. Not sure though if I would go with a > "state" there. Having a state in a committer would imply we need a > collect method as well. So far we needed a single method commit(...) and > the bookkeeping of the committables could be handled by the framework. I > think something like an optional combiner in the GlobalCommitter would > be enough. What do you think? > > GlobalCommitter<CommT, GlobalCommT> { > > void commit(GlobalCommT globalCommittables); > > GlobalCommT combine(List<CommT> committables); > > } > > A different problem that I see here is how do we handle commit failures. > Should the committables (both normal and global be included in the next > cycle, shall we retry it, ...) I think it would be worth laying it out > in the FLIP. > > @Aljoscha I think you can find the code Steven was referring in here: > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > > Best, > > Dawid > > On 14/09/2020 15:19, Aljoscha Krettek wrote: > > On 14.09.20 01:23, Steven Wu wrote: > > ## Writer interface > > For the Writer interface, should we add "*prepareSnapshot"* before the > checkpoint barrier emitted downstream? IcebergWriter would need it. Or > would the framework call "*flush*" before the barrier emitted > downstream? > that guarantee would achieve the same goal. > > I would think that we only need flush() and the semantics are that it > prepares for a commit, so on a physical level it would be called from > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I > think flush() should be renamed to something like "prepareCommit()". > > @Guowei, what do you think about this? > > > In [1], we discussed the reason for Writer to emit (checkpointId, CommT) > tuple to the committer. The committer needs checkpointId to separate out > data files for different checkpoints if concurrent checkpoints are > enabled. > > When can this happen? Even with concurrent checkpoints the snapshot > barriers would still cleanly segregate the input stream of an operator > into tranches that should manifest in only one checkpoint. With > concurrent checkpoints, all that can happen is that we start a > checkpoint before a last one is confirmed completed. > > Unless there is some weirdness in the sources and some sources start > chk1 first and some other ones start chk2 first? > > @Piotrek, do you think this is a problem? > > > For the Committer interface, I am wondering if we should split the > single > commit method into separate "*collect"* and "*commit"* methods? This > way, > it can handle both single and multiple CommT objects. > > I think we can't do this. If the sink only needs a regular Commiter, > we can perform the commits in parallel, possibly on different > machines. Only when the sink needs a GlobalCommitter would we need to > ship all commits to a single process and perform the commit there. If > both methods were unified in one interface we couldn't make the > decision of were to commit in the framework code. > > > For Iceberg, writers don't need any state. But the GlobalCommitter > needs to > checkpoint StateT. For the committer, CommT is "DataFile". Since a > single > committer can collect thousands (or more) data files in one checkpoint > cycle, as an optimization we checkpoint a single "ManifestFile" (for the > collected thousands data files) as StateT. This allows us to absorb > extended commit outages without losing written/uploaded data files, as > operator state size is as small as one manifest file per checkpoint > cycle > > You could have a point here. Is the code for this available in > open-source? I was checking out > > > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > > and didn't find the ManifestFile optimization there. > > Best, > Aljoscha > > >