Hi Liwei, I think it's indeed better to have a separate discussion thread on that.
Best regards, Martijn On Mon, Oct 10, 2022 at 7:22 AM liwei li <hilili...@gmail.com> wrote: > Thanks for the discussion. > > Favor schemes that allow for custom committer parallelism. This will help > us better use the new unified sink. > > As an added point, in the current scheme, if I want to get some task > information, such as jobid operatorid, in writer and commiter methods, it > is not easy. flink does not expose this information. But this information > is useful for saving snapshot information, naming files, and so on. Should > we consider improving it? Of course. That's not what's in this thread, but > I'm bringing it up for reference. > > Thanks, > Liwei > > On Thu, Sep 29, 2022 at 02:44 Steven Wu <stevenz...@gmail.com> wrote: > > > Yun, > > > > > `writer (parallelism = n) -> committer (parallelism = 1)` > > > > Yes, a single-parallelism committer would work for Iceberg (and probably > > other similar global transaction storage). This is how Iceberg sink is > > implemented today. > > > > > When we are refactoring the job finish process in FLIP-147 toensures > > all the records could be committed at the end of boundedstreaming job, we > > have to desert the support for the cascade commits, which makes the > > cascade commit of `committer -> global committer` not work in all cases. > > > > This is not a concern for Iceberg, as Iceberg doesn't need cascading > > "committers -> global committers". > > > > Thanks, > > Steven > > > > > > On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yungao...@aliyun.com> wrote: > > > >> Hi all, > >> > >> Very sorry for the long delay for it took a bit of time to do the > >> investigation. > >> > >> @Krzysztof > >> For the current unexpected behavior of GlobalCommitter > >> after failover, it is indeed caused by the bugs in implementation, > >> I filed an issue[1] for these bugs and I think we may fix the issue > >> for 1.15.x, 1.16.x and 1.17.x before the next minor releases. > >> > >> @Steven @Krzysztof > >> For the long-term solution >= 1.17, for supporting changing the > >> parallelism > >> of the Committer operator, I initially meant that we could set the > >> parallelism of the committer operator, thus it could support arbitrary > >> parallelism: > >> > >> 1. For ordinary sink that does not require global committer, it is still > >> `writer (parallelism = n) -> > >> committer (parallelism = n)`. > >> 2. If the committer is not required in the implementation of v1, now it > >> could be > >> reimplemented with `writer (parallelism = n) -> committer > >> (parallelism = 1)`. > >> 3. If the committer is required in the implementation of v1, now it has > >> several ways > >> to re-implement on v2, like `writer -> pre-committer topology to do > >> the rename -> > >> commiter (parallelism = 1)`. > >> > >> Do you think this would be reasonable? > >> > >> Also @Krzysztof I have the same question with Steven that is it possible > >> directly write to > >> the formal files and skip the step of renaming? Since before the meta is > >> written to the Delta Log > >> I guess the files are not visible to the users, thus it is safe to > >> directly write to the formal files? > >> > >> Best, > >> Yun Gao > >> > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-29459 > >> > >> > >> > >> ------------------------------------------------------------------ > >> From:Steven Wu <stevenz...@gmail.com> > >> Send Time:2022 Sep. 14 (Wed.) 21:33 > >> To:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> > >> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; > hililiwei > >> <hilili...@gmail.com> > >> Subject:Re: Sink V2 interface replacement for GlobalCommitter > >> > >> Krzysztof, no worries. We are discussing the same topic (how to support > >> storage with globally transactional commits). > >> > >> > In Delta Sink connector we actually use both Committer [1] and > >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file > >> writers is doing a very simple job of "of renaming the hidden file to > make > >> it visible and removing from the name some 'in-progress file' marker". > The > >> GlobalCommitter is committing data to the Delta Log. > >> > >> Curious if the writers can write the visible files directly (vs hidden > >> files first then renamed by committer). Since there is a global > committer > >> to commit the data files when Flink checkpoint completes, job failure or > >> restart shouldn't cause data file dups or loss. I probably missed some > >> context here. > >> > >> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski < > >> krzysiek.chmielew...@gmail.com> wrote: > >> Hi Yun, > >> Thanks for your input. > >> > >> In Delta Sink connector we actually use both Committer [1] and > >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file > >> writers is doing a very simple job of "of renaming the hidden file to > make > >> it visible and removing from the name some 'in-progress file' marker". > The > >> GlobalCommitter is committing data to the Delta Log. > >> > >> With this design, having many instances of Committers actually has a > >> benefit for us. Plus we would see some next features in our connector > that > >> would benefit from separate Committers with parallelism level higher > than 1. > >> > >> How I understood your suggestion Yun (and maybe It was a wrong > >> interpretation) is to use both Committer and GlobalCommitter but to > enforce > >> parallelism level 1 on the former. The GlobalCommitter created by > Flink's > >> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in > Flink < > >> 1.15. > >> > >> Anyways, I've play a little bit with the Flink code and I managed to > >> achieved this [3]. After some additional changes which I will describe > >> below, our test described in [4] passed without any data loss and no > >> Exceptions thrown by Flink. > >> > >> However changing parallelism of Committer operator to hardcoded value 1 > >> was not enough and I had to do two more things: > >> 1. add rebalance step (RebalancePartitioner) to graph between writer and > >> committer since now they have different parallelism level and default > >> partitioner was FORWARD that caused an exception to be thrown - BTW > this is > >> clear and understood > >> 2. modify Flinks CommittableCollectorSerializer [5] and this is I > believe > >> an important thing. > >> > >> The modification I had to made was caused by "Duplicate Key" exception > >> from deserialize(int version, byte[] serialized) method from line 143 of > >> [5] where we process a stream of SubtaskCommittableManager objects and > >> collect it into to the Map. The map key is a subtaskId > >> from SubtaskCommittableManager object. > >> > >> After Task Manager recovery it may happen that List of > >> SubtaskCommittableManager that is processed in that deserialize method > >> will contain two SubtaskCommittableManager for the same subtask ID. > What I > >> did is that for such a case I call SubtaskCommittableManager .merge(...) > >> method. > >> > >> With those modifications our Delta test [7] started to pass on Flink > 1.15. > >> > >> I do not know whether setting parallelism level of the Committer to 1 is > >> a right thing to do. Like I mentioned, Committer is doing some work in > our > >> Sink implementation and we might have more usage for it in next > features we > >> would like to add that would benefit from keeping parallelism level > equal > >> to writers count. > >> > >> I still think there is some issue with the V2 architecture for > topologies > >> with GlobalCommitter and failover scenarios [4] and even that duplicated > >> key in [5] described above is another case, maybe we should never have > two > >> entries for same subtaskId. That I don't know. > >> > >> P.S. > >> Steven, apologies for hijacking the thread a little bit. > >> > >> Thanks, > >> Krzysztof Chmielewski > >> > >> [1] > >> > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java > >> [2] > >> > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > >> [3] > >> > https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing > >> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > >> [5] > >> > https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java > >> [7] > >> > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > >> > >> śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com> napisał(a): > >> > setting the committer parallelism to 1. > >> > >> Yun, setting the parallelism to 1 is essentially a global committer. > That > >> would work. not sure about the implications to other parts of the v2 > sink > >> interface. > >> > >> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski < > >> krzysiek.chmielew...@gmail.com> wrote: > >> Hi Martijn > >> Could you clarify a little bit what do you mean by: > >> > >> "The important part to remember is that this > >> topology is lagging one checkpoint behind in terms of fault-tolerance: > it > >> only receives data once the committer committed" > >> > >> What are the implications? > >> > >> Thanks, > >> Krzysztof Chmielewski > >> > >> wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid> > >> napisał(a): > >> Hi, > >> Very sorry for the late reply for being in the holiday. > >> And also very thanks for the discussion, it also reminds me > >> one more background on the change of the GlobalCommitter: > >> When we are refactoring the job finish process in FLIP-147 to > >> ensures all the records could be committed at the end of bounded > >> streaming job, we have to desert the support for the cascade commits, > >> which makes the cascade commit of `committer -> global committer` not > work > >> in all cases. > >> For the current issues, one possible alternative option from my side is > >> that we > >> may support setting the committer parallelism to 1. Could this option > >> solves > >> the issue in the current scenarios? I'll also have a double check with > if > >> it could be implemented and the failed tests Krzysztof met. > >> Best, > >> Yun > >> ------------------------------------------------------------------ > >> From:Steven Wu <stevenz...@gmail.com> > >> Send Time:2022 Sep. 10 (Sat.) 11:31 > >> To:dev <dev@flink.apache.org> > >> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com> > >> Subject:Re: Sink V2 interface replacement for GlobalCommitter > >> Martjin, thanks a lot for chiming in! > >> Here are my concerns with adding GlobalCommitter in the > >> PostCommitTopology > >> 1. when we use TwoPhaseCommittingSink. We would need to create a > >> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the > >> PostCommit stage. The PostCommit stage should be doing some work after > the > >> commit (not for the commit). > >> 2. GlobalCommitter is marked as @deprecated. It will be removed at a > >> certain point. What then? > >> Thanks, > >> Steven > >> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski < > >> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com > >> > >> wrote: > >> Thanks Martijn, > >> I'm actually trying to run our V1 Delta connector on Flink 1.15 using > >> SinkV1Adapter with GlobalCommitterOperator. > >> Having said that, I might have found a potential issue with > >> GlobalCommitterOperator, checkpoitining and failover recovery [1]. > >> For "normal" scenarios it does look good though. > >> Regards, > >> Krzysztof Chmielewski > >> [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc < > >> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > > >> pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org > >> <mailto:martijnvis...@apache.org >> > >> napisał(a): > >> > Hi all, > >> > > >> > A couple of bits from when work was being done on the new sink: V1 is > >> > completely simulated as V2 [1]. V2 is strictly more expressive. > >> > > >> > If there's desire to stick to the `GlobalCommitter` interface, have a > >> > look at the StandardSinkTopologies. Or you can just add your own more > >> > fitting PostCommitTopology. The important part to remember is that > this > >> > topology is lagging one checkpoint behind in terms of > fault-tolerance: > >> it > >> > only receives data once the committer committed > >> > on notifyCheckpointComplete. Thus, the global committer needs to be > >> > idempotent and able to restore the actual state on recovery. That > >> > limitation is coming in from Flink's checkpointing behaviour and > >> applies to > >> > both V1 and V2. GlobalCommitterOperator is abstracting these issues > >> along > >> > with handling retries (so commits that happen much later). So it's > >> probably > >> > a good place to start just with the standard topology. > >> > > >> > Best regards, > >> > > >> > Martijn > >> > > >> > [1] > >> > > >> > > >> > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > >> < > >> > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > >> > > >> > > >> > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski < > >> > krzysiek.chmielew...@gmail.com <mailto: > krzysiek.chmielew...@gmail.com > >> >>: > >> > > >> > > Hi, > >> > > Krzysztof Chmielewski [1] from Delta-Flink connector open source > >> > community > >> > > here [2]. > >> > > > >> > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is > >> > > something exactly what Flink-Delta Sink needs since it is the place > >> where > >> > > we do an actual commit to the Delta Log which should be done from a > >> one > >> > > place/instance. > >> > > > >> > > Currently I'm evaluating V2 for our connector and having, how > Steven > >> > > described it a "more natural, built-in concept/support of > >> GlobalCommitter > >> > > in the sink v2 interface" would be greatly appreciated. > >> > > > >> > > Cheers, > >> > > Krzysztof Chmielewski > >> > > > >> > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > > >> > > [2] https://github.com/delta-io/connectors/tree/master/flink < > >> https://github.com/delta-io/connectors/tree/master/flink > > >> > > > >> > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto: > >> stevenz...@gmail.com >> napisał(a): > >> > > > >> > > > Hi Yun, > >> > > > > >> > > > Thanks a lot for the reply! > >> > > > > >> > > > While we can add the global committer in the > >> WithPostCommitTopology, > >> > the > >> > > > semantics are weird. The Commit stage actually didn't commit > >> anything > >> > to > >> > > > the Iceberg table, and the PostCommit stage is where the Iceberg > >> commit > >> > > > happens. > >> > > > > >> > > > I just took a quick look at DeltaLake Flink sink. It still uses > >> the V1 > >> > > sink > >> > > > interface [1]. I think it might have the same issue when > switching > >> to > >> > the > >> > > > V2 sink interface. > >> > > > > >> > > > For data lake storages (like Iceberg, DeltaLake) or any storage > >> with > >> > > global > >> > > > transactional commit, it would be more natural to have a built-in > >> > > > concept/support of GlobalCommitter in the sink v2 interface. > >> > > > > >> > > > Thanks, > >> > > > Steven > >> > > > > >> > > > [1] > >> > > > > >> > > > > >> > > > >> > > >> > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > >> < > >> > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > >> > > >> > > > > >> > > > > >> > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao > >> <yungao...@aliyun.com.invalid> > >> > > > wrote: > >> > > > > >> > > > > Hi Steven, Liwei, > >> > > > > Very sorry for missing this mail and response very late. > >> > > > > I think the initial thought is indeed to use > >> `WithPostCommitTopology` > >> > > as > >> > > > > a replacement of the original GlobalCommitter, and currently > the > >> > > adapter > >> > > > of > >> > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink > >> V1 > >> > > > > interface > >> > > > > onto an implementation of `WithPostCommitTopology`. > >> > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, > thus > >> It > >> > > seems > >> > > > > to > >> > > > > me it could support both global committer and small file > >> compaction? > >> > We > >> > > > > might > >> > > > > have an `WithPostCommitTopology` implementation like > >> > > > > DataStream ds = add global committer; > >> > > > > if (enable file compaction) { > >> > > > > build the compaction subgraph from ds > >> > > > > } > >> > > > > Best, > >> > > > > Yun > >> > > > > [1] > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >> < > >> > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >> > > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >> < > >> > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > >> > > >> > > > > > > >> > > > > > >> ------------------------------------------------------------------ > >> > > > > From:Steven Wu <stevenz...@gmail.com <mailto: > >> stevenz...@gmail.com >> > >> > > > > Send Time:2022 Aug. 17 (Wed.) 07:30 > >> > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; > >> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >> > >> > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter > >> > > > > > Plus, it will disable the future capability of small file > >> > compaction > >> > > > > stage post commit. > >> > > > > I should clarify this comment. if we are using the > >> > > > `WithPostCommitTopology` > >> > > > > for global committer, we would lose the capability of using the > >> post > >> > > > commit > >> > > > > stage for small files compaction. > >> > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu < > stevenz...@gmail.com > >> <mailto:stevenz...@gmail.com >> > >> > > wrote: > >> > > > > > > >> > > > > > In the V1 sink interface, there is a GlobalCommitter for > >> Iceberg. > >> > > With > >> > > > > the > >> > > > > > V2 sink interface, GlobalCommitter has been deprecated by > >> > > > > > WithPostCommitTopology. I thought the post commit stage is > >> mainly > >> > for > >> > > > > async > >> > > > > > maintenance (like compaction). > >> > > > > > > >> > > > > > Are we supposed to do sth similar to the > >> > GlobalCommittingSinkAdapter? > >> > > > It > >> > > > > > seems like a temporary transition plan for bridging v1 sinks > >> to v2 > >> > > > > > interfaces. > >> > > > > > > >> > > > > > private class GlobalCommittingSinkAdapter extends > >> > > > > TwoPhaseCommittingSinkAdapter > >> > > > > > implements WithPostCommitTopology<InputT, CommT> { > >> > > > > > @Override > >> > > > > > public void > >> > > addPostCommitTopology(DataStream<CommittableMessage<CommT>> > >> > > > > committables) { > >> > > > > > StandardSinkTopologies.addGlobalCommitter( > >> > > > > > committables, > >> > > > > > GlobalCommitterAdapter::new, > >> > > > > > () -> sink.getCommittableSerializer().get()); > >> > > > > > } > >> > > > > > } > >> > > > > > > >> > > > > > > >> > > > > > In the Iceberg PR [1] for adopting the new sink interface, > >> Liwei > >> > used > >> > > > the > >> > > > > > "global" partitioner to force all committables go to a single > >> > > committer > >> > > > > > task 0. It will effectively force a global committer > disguised > >> in > >> > the > >> > > > > > parallel committers. It is a little weird and also can lead > to > >> > > > questions > >> > > > > > why other committer tasks are not getting any messages. Plus, > >> it > >> > will > >> > > > > > disable the future capability of small file compaction stage > >> post > >> > > > commit. > >> > > > > > Hence, I am asking what is the right approach to achieve > global > >> > > > committer > >> > > > > > behavior. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Steven > >> > > > > > > >> > > > > > [1] > >> https://github.com/apache/iceberg/pull/4904/files#r946975047 < > >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > < > >> > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 < > >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> -- > liwei li >