Yun, > `writer (parallelism = n) -> committer (parallelism = 1)`
Yes, a single-parallelism committer would work for Iceberg (and probably other similar global transaction storage). This is how Iceberg sink is implemented today. > When we are refactoring the job finish process in FLIP-147 toensures all the records could be committed at the end of boundedstreaming job, we have to desert the support for the cascade commits, which makes the cascade commit of `committer -> global committer` not work in all cases. This is not a concern for Iceberg, as Iceberg doesn't need cascading "committers -> global committers". Thanks, Steven On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi all, > > Very sorry for the long delay for it took a bit of time to do the > investigation. > > @Krzysztof > For the current unexpected behavior of GlobalCommitter > after failover, it is indeed caused by the bugs in implementation, > I filed an issue[1] for these bugs and I think we may fix the issue > for 1.15.x, 1.16.x and 1.17.x before the next minor releases. > > @Steven @Krzysztof > For the long-term solution >= 1.17, for supporting changing the parallelism > of the Committer operator, I initially meant that we could set the > parallelism of the committer operator, thus it could support arbitrary > parallelism: > > 1. For ordinary sink that does not require global committer, it is still > `writer (parallelism = n) -> > committer (parallelism = n)`. > 2. If the committer is not required in the implementation of v1, now it > could be > reimplemented with `writer (parallelism = n) -> committer (parallelism > = 1)`. > 3. If the committer is required in the implementation of v1, now it has > several ways > to re-implement on v2, like `writer -> pre-committer topology to do > the rename -> > commiter (parallelism = 1)`. > > Do you think this would be reasonable? > > Also @Krzysztof I have the same question with Steven that is it possible > directly write to > the formal files and skip the step of renaming? Since before the meta is > written to the Delta Log > I guess the files are not visible to the users, thus it is safe to > directly write to the formal files? > > Best, > Yun Gao > > > [1] https://issues.apache.org/jira/browse/FLINK-29459 > > > > ------------------------------------------------------------------ > From:Steven Wu <stevenz...@gmail.com> > Send Time:2022 Sep. 14 (Wed.) 21:33 > To:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> > Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; hililiwei < > hilili...@gmail.com> > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > Krzysztof, no worries. We are discussing the same topic (how to support > storage with globally transactional commits). > > > In Delta Sink connector we actually use both Committer [1] and > GlobalCommitter [2]. The former, since we are using Flink's Parquet file > writers is doing a very simple job of "of renaming the hidden file to make > it visible and removing from the name some 'in-progress file' marker". The > GlobalCommitter is committing data to the Delta Log. > > Curious if the writers can write the visible files directly (vs hidden > files first then renamed by committer). Since there is a global committer > to commit the data files when Flink checkpoint completes, job failure or > restart shouldn't cause data file dups or loss. I probably missed some > context here. > > On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > Hi Yun, > Thanks for your input. > > In Delta Sink connector we actually use both Committer [1] and > GlobalCommitter [2]. The former, since we are using Flink's Parquet file > writers is doing a very simple job of "of renaming the hidden file to make > it visible and removing from the name some 'in-progress file' marker". The > GlobalCommitter is committing data to the Delta Log. > > With this design, having many instances of Committers actually has a > benefit for us. Plus we would see some next features in our connector that > would benefit from separate Committers with parallelism level higher than 1. > > How I understood your suggestion Yun (and maybe It was a wrong > interpretation) is to use both Committer and GlobalCommitter but to enforce > parallelism level 1 on the former. The GlobalCommitter created by Flink's > 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink < > 1.15. > > Anyways, I've play a little bit with the Flink code and I managed to > achieved this [3]. After some additional changes which I will describe > below, our test described in [4] passed without any data loss and no > Exceptions thrown by Flink. > > However changing parallelism of Committer operator to hardcoded value 1 > was not enough and I had to do two more things: > 1. add rebalance step (RebalancePartitioner) to graph between writer and > committer since now they have different parallelism level and default > partitioner was FORWARD that caused an exception to be thrown - BTW this is > clear and understood > 2. modify Flinks CommittableCollectorSerializer [5] and this is I believe > an important thing. > > The modification I had to made was caused by "Duplicate Key" exception > from deserialize(int version, byte[] serialized) method from line 143 of > [5] where we process a stream of SubtaskCommittableManager objects and > collect it into to the Map. The map key is a subtaskId > from SubtaskCommittableManager object. > > After Task Manager recovery it may happen that List of > SubtaskCommittableManager that is processed in that deserialize method > will contain two SubtaskCommittableManager for the same subtask ID. What I > did is that for such a case I call SubtaskCommittableManager .merge(...) > method. > > With those modifications our Delta test [7] started to pass on Flink 1.15. > > I do not know whether setting parallelism level of the Committer to 1 is a > right thing to do. Like I mentioned, Committer is doing some work in our > Sink implementation and we might have more usage for it in next features we > would like to add that would benefit from keeping parallelism level equal > to writers count. > > I still think there is some issue with the V2 architecture for topologies > with GlobalCommitter and failover scenarios [4] and even that duplicated > key in [5] described above is another case, maybe we should never have two > entries for same subtaskId. That I don't know. > > P.S. > Steven, apologies for hijacking the thread a little bit. > > Thanks, > Krzysztof Chmielewski > > [1] > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java > [2] > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > [3] > https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing > [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > [5] > https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java > [7] > https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java > > śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com> napisał(a): > > setting the committer parallelism to 1. > > Yun, setting the parallelism to 1 is essentially a global committer. That > would work. not sure about the implications to other parts of the v2 sink > interface. > > On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > Hi Martijn > Could you clarify a little bit what do you mean by: > > "The important part to remember is that this > topology is lagging one checkpoint behind in terms of fault-tolerance: it > only receives data once the committer committed" > > What are the implications? > > Thanks, > Krzysztof Chmielewski > > wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid> > napisał(a): > Hi, > Very sorry for the late reply for being in the holiday. > And also very thanks for the discussion, it also reminds me > one more background on the change of the GlobalCommitter: > When we are refactoring the job finish process in FLIP-147 to > ensures all the records could be committed at the end of bounded > streaming job, we have to desert the support for the cascade commits, > which makes the cascade commit of `committer -> global committer` not work > in all cases. > For the current issues, one possible alternative option from my side is > that we > may support setting the committer parallelism to 1. Could this option > solves > the issue in the current scenarios? I'll also have a double check with if > it could be implemented and the failed tests Krzysztof met. > Best, > Yun > ------------------------------------------------------------------ > From:Steven Wu <stevenz...@gmail.com> > Send Time:2022 Sep. 10 (Sat.) 11:31 > To:dev <dev@flink.apache.org> > Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com> > Subject:Re: Sink V2 interface replacement for GlobalCommitter > Martjin, thanks a lot for chiming in! > Here are my concerns with adding GlobalCommitter in the PostCommitTopology > 1. when we use TwoPhaseCommittingSink. We would need to create a > noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the > PostCommit stage. The PostCommit stage should be doing some work after the > commit (not for the commit). > 2. GlobalCommitter is marked as @deprecated. It will be removed at a > certain point. What then? > Thanks, > Steven > On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> > wrote: > Thanks Martijn, > I'm actually trying to run our V1 Delta connector on Flink 1.15 using > SinkV1Adapter with GlobalCommitterOperator. > Having said that, I might have found a potential issue with > GlobalCommitterOperator, checkpoitining and failover recovery [1]. > For "normal" scenarios it does look good though. > Regards, > Krzysztof Chmielewski > [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc < > https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > > pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org <mailto: > martijnvis...@apache.org >> > napisał(a): > > Hi all, > > > > A couple of bits from when work was being done on the new sink: V1 is > > completely simulated as V2 [1]. V2 is strictly more expressive. > > > > If there's desire to stick to the `GlobalCommitter` interface, have a > > look at the StandardSinkTopologies. Or you can just add your own more > > fitting PostCommitTopology. The important part to remember is that this > > topology is lagging one checkpoint behind in terms of fault-tolerance: > it > > only receives data once the committer committed > > on notifyCheckpointComplete. Thus, the global committer needs to be > > idempotent and able to restore the actual state on recovery. That > > limitation is coming in from Flink's checkpointing behaviour and > applies to > > both V1 and V2. GlobalCommitterOperator is abstracting these issues > along > > with handling retries (so commits that happen much later). So it's > probably > > a good place to start just with the standard topology. > > > > Best regards, > > > > Martijn > > > > [1] > > > > > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > < > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > > > > > > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski < > > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com > >>: > > > > > Hi, > > > Krzysztof Chmielewski [1] from Delta-Flink connector open source > > community > > > here [2]. > > > > > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is > > > something exactly what Flink-Delta Sink needs since it is the place > where > > > we do an actual commit to the Delta Log which should be done from a > one > > > place/instance. > > > > > > Currently I'm evaluating V2 for our connector and having, how Steven > > > described it a "more natural, built-in concept/support of > GlobalCommitter > > > in the sink v2 interface" would be greatly appreciated. > > > > > > Cheers, > > > Krzysztof Chmielewski > > > > > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > > > > [2] https://github.com/delta-io/connectors/tree/master/flink < > https://github.com/delta-io/connectors/tree/master/flink > > > > > > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto: > stevenz...@gmail.com >> napisał(a): > > > > > > > Hi Yun, > > > > > > > > Thanks a lot for the reply! > > > > > > > > While we can add the global committer in the WithPostCommitTopology, > > the > > > > semantics are weird. The Commit stage actually didn't commit > anything > > to > > > > the Iceberg table, and the PostCommit stage is where the Iceberg > commit > > > > happens. > > > > > > > > I just took a quick look at DeltaLake Flink sink. It still uses the > V1 > > > sink > > > > interface [1]. I think it might have the same issue when switching > to > > the > > > > V2 sink interface. > > > > > > > > For data lake storages (like Iceberg, DeltaLake) or any storage with > > > global > > > > transactional commit, it would be more natural to have a built-in > > > > concept/support of GlobalCommitter in the sink v2 interface. > > > > > > > > Thanks, > > > > Steven > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > < > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > > > > > > > > > > > > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid > > > > > > wrote: > > > > > > > > > Hi Steven, Liwei, > > > > > Very sorry for missing this mail and response very late. > > > > > I think the initial thought is indeed to use > `WithPostCommitTopology` > > > as > > > > > a replacement of the original GlobalCommitter, and currently the > > > adapter > > > > of > > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 > > > > > interface > > > > > onto an implementation of `WithPostCommitTopology`. > > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus > It > > > seems > > > > > to > > > > > me it could support both global committer and small file > compaction? > > We > > > > > might > > > > > have an `WithPostCommitTopology` implementation like > > > > > DataStream ds = add global committer; > > > > > if (enable file compaction) { > > > > > build the compaction subgraph from ds > > > > > } > > > > > Best, > > > > > Yun > > > > > [1] > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > < > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > < > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > > > > > > > > > ------------------------------------------------------------------ > > > > > From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com > >> > > > > > Send Time:2022 Aug. 17 (Wed.) 07:30 > > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; > hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >> > > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > > > > > Plus, it will disable the future capability of small file > > compaction > > > > > stage post commit. > > > > > I should clarify this comment. if we are using the > > > > `WithPostCommitTopology` > > > > > for global committer, we would lose the capability of using the > post > > > > commit > > > > > stage for small files compaction. > > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com > <mailto:stevenz...@gmail.com >> > > > wrote: > > > > > > > > > > > > In the V1 sink interface, there is a GlobalCommitter for > Iceberg. > > > With > > > > > the > > > > > > V2 sink interface, GlobalCommitter has been deprecated by > > > > > > WithPostCommitTopology. I thought the post commit stage is > mainly > > for > > > > > async > > > > > > maintenance (like compaction). > > > > > > > > > > > > Are we supposed to do sth similar to the > > GlobalCommittingSinkAdapter? > > > > It > > > > > > seems like a temporary transition plan for bridging v1 sinks to > v2 > > > > > > interfaces. > > > > > > > > > > > > private class GlobalCommittingSinkAdapter extends > > > > > TwoPhaseCommittingSinkAdapter > > > > > > implements WithPostCommitTopology<InputT, CommT> { > > > > > > @Override > > > > > > public void > > > addPostCommitTopology(DataStream<CommittableMessage<CommT>> > > > > > committables) { > > > > > > StandardSinkTopologies.addGlobalCommitter( > > > > > > committables, > > > > > > GlobalCommitterAdapter::new, > > > > > > () -> sink.getCommittableSerializer().get()); > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei > > used > > > > the > > > > > > "global" partitioner to force all committables go to a single > > > committer > > > > > > task 0. It will effectively force a global committer disguised > in > > the > > > > > > parallel committers. It is a little weird and also can lead to > > > > questions > > > > > > why other committer tasks are not getting any messages. Plus, it > > will > > > > > > disable the future capability of small file compaction stage > post > > > > commit. > > > > > > Hence, I am asking what is the right approach to achieve global > > > > committer > > > > > > behavior. > > > > > > > > > > > > Thanks, > > > > > > Steven > > > > > > > > > > > > [1] > https://github.com/apache/iceberg/pull/4904/files#r946975047 < > https://github.com/apache/iceberg/pull/4904/files#r946975047 > < > > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 < > https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > > > > > > > > > > > > > > > > > > > > > > >