Hi Steven, Liwei, 
Very sorry for missing this mail and response very late. 
I think the initial thought is indeed to use `WithPostCommitTopology` as
a replacement of the original GlobalCommitter, and currently the adapter of
Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 interface
onto an implementation of `WithPostCommitTopology`.
Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems to
me it could support both global committer and small file compaction? We might
have an `WithPostCommitTopology` implementation like
DataStream ds = add global committer;
if (enable file compaction) {
 build the compaction subgraph from ds
}
Best,
Yun
[1] 
https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 
<https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >
------------------------------------------------------------------
From:Steven Wu <stevenz...@gmail.com>
Send Time:2022 Aug. 17 (Wed.) 07:30
To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Plus, it will disable the future capability of small file compaction
stage post commit.
I should clarify this comment. if we are using the `WithPostCommitTopology`
for global committer, we would lose the capability of using the post commit
stage for small files compaction.
On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> wrote:
>
> In the V1 sink interface, there is a GlobalCommitter for Iceberg. With the
> V2 sink interface, GlobalCommitter has been deprecated by
> WithPostCommitTopology. I thought the post commit stage is mainly for async
> maintenance (like compaction).
>
> Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
> seems like a temporary transition plan for bridging v1 sinks to v2
> interfaces.
>
> private class GlobalCommittingSinkAdapter extends 
> TwoPhaseCommittingSinkAdapter
> implements WithPostCommitTopology<InputT, CommT> {
> @Override
> public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> 
> committables) {
> StandardSinkTopologies.addGlobalCommitter(
> committables,
> GlobalCommitterAdapter::new,
> () -> sink.getCommittableSerializer().get());
> }
> }
>
>
> In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
> "global" partitioner to force all committables go to a single committer
> task 0. It will effectively force a global committer disguised in the
> parallel committers. It is a little weird and also can lead to questions
> why other committer tasks are not getting any messages. Plus, it will
> disable the future capability of small file compaction stage post commit.
> Hence, I am asking what is the right approach to achieve global committer
> behavior.
>
> Thanks,
> Steven
>
> [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 
> <https://github.com/apache/iceberg/pull/4904/files#r946975047 >
>

Reply via email to