Hi Fabian,

Thanks for drafting the FLIP and trying to support small file compaction. I
think this feature is very urgent and valuable for users(at least for me).

Currently I am trying to support streaming rewrite(compact) for Iceberg on
PR#3323 <https://github.com/apache/iceberg/pull/3323>. As Steven mentioned,
Iceberg sink and compact data through the following steps:
Step-1: Some parallel data writer(sinker) to write streaming data as files.
Step-2: A single parallelism data files committer to commit the completed
files as soon as possible to make them available.
Step-3: Some parallel file rewriter(compactor) to collect committed files
from multiple checkpoints, and rewriter(compact) them together once the
total file size or number of files reach the threshold.
Step-4: A single parallelism rewrite(compact) result committer to commit
the rewritten(compacted) files to replace the old files and make them
available.


If Flink want to support small file compaction, some key point I think is
necessary:

1, Compact files from multiple checkpoints.
I totally agree with Jingsong, because completed file size usually could
not reach the threshold in a single checkpoint. Especially for partitioned
table, we need to compact the files of each partition, but usually the file
size of each partition will be different and may not reach the merge
threshold. If we compact these files, in a single checkpoint, regardless of
whether the total file size reaches the threshold, then the value of
compacting will be diminished and we will still get small files because
these compacted files are not reach to target size. So we need the
compactor to collect committed files from multiple checkpoints and compact
them until they reach the threshold.

2, Separate write phase and compact phase.
Users usually hope the data becomes available as soon as possible, and the
 end-to-end latency is very important. I think we need to separate the
write and compact phase. For the write phase, there include the Step-1
and Step-2, we sink data as file and commit it pre checkpoint and regardless
of whether the file size it is. That could ensure the data will be
available ASAP. For the compact phase, there include the Step-3
and Step-4,  the compactor should collect committed files from multiple
checkpoints and compact them asynchronously once they reach the threshold,
and the compact committer will commit the  compaction result in the next
checkpoint. We compact the committed files asynchronously because we don't
want the compaction to affect the data sink or the whole pipeline.

3, Exactly once guarantee between write and compact phase.
Once we separate write phase and compact phase, we need to consider
how to guarantee
the exact once semantic between two phases. We should not lose any data or
files on the compactor(Step-3) in any case and cause the compaction result
to be inconsistent with before. I think flink should provide an easy-to-use
interface to make that easier.

4, Metadata operation and  compaction result validation.
In the compact phase, there may be not only compact files, but also a lot
of metadata operations, such as the iceberg needing to read/write manifest
and do MOR. And we need some interface to support users to do some
validation of the compaction result. I think these points should be
considered when we design the compaction API.


Back to FLIP-191, option 1 looks very complicated while option 2 is
relatively simple, but neither of these two solutions separates the write
phase from the compact phase. So I think we should consider the points I
mentioned above. And if you have any other questions you can always feel
free to reach out to me!

BR,
Reo


On 2021/11/07 05:20:36 Steven Wu wrote:
> Fabian, thanks a lot for the proposal and starting the discussion.
>
> We probably should first describe the different causes of small files and
> what problems was this proposal trying to solve. I wrote a data shuffling
> proposal [1] for Flink Iceberg sink (shared with Iceberg community [2]).
It
> can address small files problems due to skewed data distribution across
> Iceberg table partitions. Streaming shuffling before writers (to files) is
> typically more efficient than post-write file compaction (which involves
> read-merge-write). It is usually cheaper to prevent a problem (small
files)
> than fixing it.
>
> The sink coordinator checkpoint problem (mentioned in option 1) would be
> great if Flink can address it. In the spirit of source (enumerator-reader)
> and sink (writer-coordinator) duality, sink coordinator checkpoint should
> happen after the writer operator. This would be a natural fit to support
> global committer in FLIP-143. It is probably an orthogonal matter to this
> proposal.
>
> Personally, I am usually in favor of keeping streaming ingestion (to data
> lake) relatively simple and stable. Also sometimes compaction and sorting
> are performed together in data rewrite maintenance jobs to improve read
> performance. In that case, the value of compacting (in Flink streaming
> ingestion) diminishes.
>
> Currently, it is unclear from the doc and this thread where the compaction
> is actually happening. Jingsong's reply described one model
> writer (parallel) -> aggregator (single-parallelism compaction planner) ->
> compactor (parallel) -> global committer (single-parallelism)
>
> In the Iceberg community, the following model has been discussed. It is
> better for Iceberg because it won't delay the data availability.
> writer (parallel) -> global committer for append (single parallelism) ->
> compactor (parallel) -> global committer for rewrite commit (single
> parallelism)
>
> Thanks,
> Steven
>
> [1]
>
https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
> [2] https://www.mail-archive.com/dev@iceberg.apache.org/msg02889.html
>
>
>
>
> On Thu, Nov 4, 2021 at 4:46 AM Yun Gao <yu...@aliyun.com.invalid> wrote:
>
> > Hi all,
> >
> > Very thanks for Fabian drafting the FLIP and the warm discussion!
> >
> > I'd like to complement some points based on the previous discussion:
> >
> > 1. Regarding the case of trans-checkpoints merging
> >
> > I agree with that all the options would not block the current checkpoint
> > that producing the files to commit, but I think Guowei is referring to
> > another
> > issue: suppose the files are created in checkpoint 1 to 10 and we want
to
> > merge
> > the files created in 10 checkpoints, then if with arbitrary
> > topology we might merge the files during checkpoint 11 to 20, without
> > blocking
> > the following checkpoints. But if the compaction happens in
> > Committer#commit
> > as the option 2, I think perhaps with the current mechanism the commit
> > need to
> > be finished before checkpoint 11.
> >
> > 2. Regarding Commit / Global commit
> >
> > As a whole, I think perhaps whether we have compaction should be
> > independent from whether we have the global committer ? The global
> > committer is initially used to write the bucket meta after all the files
> > of a single bucket is committed. Thus
> > a) If there are failure & retry, the global commit should be wait.
> > b) I think with all the options, the committable should represents a
file
> > after
> >  compaction. It might be directly a file after compaction or a list of
> > small files
> > that wait to commit. Also, we would not need to wait for another
> > checkpoint if
> > we only use it in meta-writing cases. But still, I think the behavior
here
> > does not
> > change whether we have compaction ?
> >
> > In fact, perhaps a better abstraction from my view is to remove the
> > GlobalCommitter directly and only have one-level of committer.  If users
> > need
> > writing meta, then the action of writing metadata should be viewed as
> > "commit". The users could write to the formal files freely, if there are
> > failover, he could directly remove the unnecessary ones since all these
> > files are invisible yet.
> > But this might be a different topic.
> >
> > 3. Regarding the comparison of the API
> >
> > I totally agree with that for the specific case of compaction, option 2
> > would
> > indeed be easy to use since we have considered this case when we
designed
> > the new API. But as a whole, from another view, I think perhaps writing
a
> > stream / batch unified program with the DataStream API should not be
that
> > hard? It does not increase more difficulty compared to writing a normal
> > stream / batch unified flink job. For the specific issues we mentioned,
I
> > think
> > based on the previous discussion, we should finally add `finish()` to
the
> > UDF, and for now I think we could at least first add it to family of
> > `ProcessFunction`.
> >
> > Best,
> > Yun
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Arvid Heise <ar...@apache.org>
> > Send Time:2021 Nov. 4 (Thu.) 16:55
> > To:Till Rohrmann <tr...@apache.org>
> > Cc:dev <de...@flink.apache.org>; "David Morávek" <dm...@apache.org>
> > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
> > small file compaction
> >
> > >
> > > Emitting records for downstream operators in or after
> > > notifyCheckpointComplete no longer works after FLIP-147 when executing
> > the
> > > final checkpoint. The problem is that the final checkpoint happens
after
> > > the EOI and we would like to keep the property that you can terminate
the
> > > whole topology with a single checkpoint, if possible.
> > >
> >
> > Sorry for the confusion, I meant to say emit them after the barrier
(e.g.
> > in snapshotState).
> >
> > On Thu, Nov 4, 2021 at 9:49 AM Till Rohrmann <tr...@apache.org> wrote:
> >
> > > Thanks for the detailed description Arvid.
> > >
> > > I might misunderstand things but one comment concerning:
> > >
> > > | We could even optimize the writer to only emit the committables
after
> > > notifyCheckpointComplete as long as we retain them in the state of the
> > > writer.
> > >
> > > Emitting records for downstream operators in or after
> > > notifyCheckpointComplete no longer works after FLIP-147 when executing
> > the
> > > final checkpoint. The problem is that the final checkpoint happens
after
> > > the EOI and we would like to keep the property that you can terminate
the
> > > whole topology with a single checkpoint, if possible.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Nov 4, 2021 at 9:05 AM Arvid Heise <ar...@apache.org> wrote:
> > >
> > >> Hi folks,
> > >>
> > >> thanks for the lively discussion. Let me present my point of view on
a
> > >> couple of items:
> > >>
> > >> *Impact on checkpointing times*
> > >>
> > >> Currently, we send the committables of the writer downstream before
the
> > >> barrier is sent. That allows us to include all committables in the
> > state of
> > >> the committers, such that the committer receives all committables
> > before it
> > >> snapshots.
> > >>
> > >> All proposals now add a processing step on the committables where
> > certain
> > >> committables are virtually merged into larger committables. However,
> > none
> > >> of the approaches require the physical merge to happen before the
> > barrier
> > >> is sent downstream. In fact, we can even delay the virtual merge
after
> > the
> > >> checkpoint has been fully taken. We could even optimize the writer to
> > only
> > >> emit the committables after notifyCheckpointComplete as long as we
> > >> retain them in the state of the writer. The important point is that
the
> > >> committables become part of the checkpoint, but they are always
> > committed
> > >> only after notifyCheckpointComplete. Especially the retry mechanism
of
> > >> 1.14 decouples the exact time of the commit from
> > notifyCheckpointComplete.
> > >> The retry happens asynchronously, so there is no reason to believe
that
> > we
> > >> can't do the merging asynchronously with any option.
> > >>
> > >> Naturally, all approaches delay the checkpoint barrier a bit by
either
> > >> adding RPC calls or shuffles but the impact is rather minimal in a
well
> > >> configured system (the number of committables is assumed to be
tiny), so
> > >> I'm assuming a tad higher checkpointing time because the topology is
> > more
> > >> complex (in all cases).
> > >>
> > >> *Impact on latency*
> > >>
> > >> All approaches will also delay the effective commit, since
additionaly
> > >> work needs to be done but I'd argue that this is by design and
should be
> > >> clear to everyone. Especially, when merging files across checkpoints,
> > >> certain results will not be visible until much later. Once, we settle
> > for
> > >> an approach, we should think which options we give to sink developers
> > and
> > >> end-users to impact that latency.
> > >>
> > >> An important aspect here is that we also refine the contract on
> > >> GlobalCommitter. Currently, it's not clear when it is supposed to be
> > >> called; for example, to register files in a metastore. Naively, I
would
> > >> have said that the GlobalCommitter is invoked when all committables
of a
> > >> certain checkpoint have been committed.
> > >> a) But what happens in the case of a failure and retry? Do we delay
> > until
> > >> the commit finally happens?
> > >> b) What do we do with committables that are held back for
compaction? Do
> > >> we global commit when all committables of checkpoint A are committed
> > >> ignoring small files? Or do we wait until a later checkpoint, when
all
> > >> small files of A have been merged such that indeed all data of A has
> > been
> > >> committed.
> > >>
> > >> *Re: Developers understand that the cost is relatively high.*
> > >>
> > >> Yes that is a valid concern that we already have with committer and
> > >> global committer (which none of the new users understand). I don't
like
> > >> that we have so many Optional methods where it's not clear which
> > methods to
> > >> implement to achieve certain functionality. Ideally, we split up the
> > Sink
> > >> in many smaller components where you add certain traits. For example,
> > >> HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
> > >> HasGlobalCommitter (there are probably better names for the traits)
> > >> We still can change that as everything is experimental (and annoy
> > >> connector devs) but it seems to be an orthogonal discussion.
> > >>
> > >> If we compare Option 2 with Option 3, I'd argue that the mental
model of
> > >> the connector dev is more stressed with option 3 than with 2. He
needs
> > to
> > >> map the high-level concepts of the current Sink to the low-level
> > concepts
> > >> of DataStream. He needs to understand the data that is being sent
> > between
> > >> writer and committer to be able to hook in.
> > >> Note that we need to move away from just sending CommT and wrap it
with
> > >> some metadata, such as checkpoint id and subtask id.
> > >>
> > >> The dev also needs to care about managing the state, which we may
> > >> abstract in Option 2 (not entirely sure). And the dev needs to
> > understand
> > >> the task life-cycle to emit the remaining committables before the job
> > shuts
> > >> down (is that even possible on DataStream level? Are we notified on
EOI
> > or
> > >> do we expect devs to use operator API?). Lastly, if things are done
> > >> asynchronously as you have championed, the user also needs to take
care
> > of
> > >> ensuring that all async tasks are done before shutdown.
> > >>
> > >> *Re 2. Repeated implementation*
> > >>
> > >> The newly introduced `aggregate` can set the parallelism, thus
perhaps
> > >>> `setUid`, `slotSharingGroup (including resources)`, and
> > `maxParallelism`
> > >>> also need to be supported? If they are supported, additional
workloads
> > >>> are
> > >>> required, and at the same time I feel that these workloads are
> > >>> unnecessary;
> > >>> if not, it will also increase the developers’ understanding cost:
for
> > >>> example, why these operators can not set these attributes?
> > >>>
> > >>
> > >> I'm assuming that you mean that we are about to replicate API between
> > >> DataStream and Sink compactor? I wouldn't do that. I would actually
also
> > >> fix the parallelism to the writer's parallelism. So we just have a
> > pre-key,
> > >> aggregate, and post-key where we can have defaults for the keys. We
> > should
> > >> provide access to metadata of CommT through a context. I'd always run
> > >> compaction in the same slot sharing group as the writer, similar to
how
> > the
> > >> committer is run. I don't see why we should take a different route
with
> > the
> > >> design of the compaction than with the committer where we abstract
all
> > >> these things away.
> > >>
> > >> Also for another
> > >>> example, `SinkWriter` has introduced many existing (or repeated)
> > >>> implementations in the DataStream API in order to support ‘State’,
> > >>> ‘Timer’,
> > >>> ‘asynchronization’, ‘subindex’, etc.
> > >>>
> > >>
> > >> I agree that the repeated implementations for Timer is weird but that
> > >> could have been solved similar to how we solved it with
asynchronization
> > >> where we did not replicate interfaces but rather pulled up the
concept
> > of
> > >> MailboxExecutor from datastream to core.
> > >> Having specific Context objects is imho actually a good design
decision
> > -
> > >> RuntimeContext is heavily overloaded and shouldn't be emulated or
used.
> > >>
> > >>
> > >>> In addition, if a new feature is introduced on the DataStream in the
> > >>> future, I think it may be necessary to consider the special
operators
> > of
> > >>> the sinks separately, which may also be a burden.
> > >>>
> > >>
> > >> Yes, option 2 has a higher maintenance cost on Flink since Flink
need to
> > >> provide these operators but this gives us also more control about
> > changing
> > >> things in the future. If we expose the elements that we exchange
between
> > >> the different sink operators, we cannot change that in the future.
Now
> > we
> > >> can easily abstract a change in the elements away.
> > >> Regarding new features in DataStream: I think this is actually an
> > >> advantage. We can evolve DataStream API without thinking about how
that
> > >> could screw up potential sinks.
> > >>
> > >> *Re 2.Aggregator(I think 1 parallelism is OK, why is it multiple
> > >> parallelism?)*
> > >>
> > >> If a user wants to aggregate with parallelism 1, he can use a
constant
> > >> key (which we should describe in the javadoc) and that should be
> > default.
> > >> I think that a higher parallelism makes sense if you want to compact
> > >> across checkpoints where you probably want to aggregate the files in
> > >> different buckets and you can process buckets in parallel.
> > >>
> > >>
> > >>
> > >> I hope to have cleared things up a bit. I guess it became obvious
that I
> > >> prefer Option 2 because it gives us more means to provide a good
> > >> implementation and help the sink developer while also giving us
> > flexibility
> > >> in the future. Happy to go more into details on certain points.
> > >>
> > >>
> > >>
> > >> On Thu, Nov 4, 2021 at 7:20 AM Guowei Ma <gu...@gmail.com> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> Very thanks Fabian for drafting this FLIP! It looks very good to
me. I
> > >>> see
> > >>> currently most of us agree with option 2, but I personally feel that
> > >>> option
> > >>> 3 may be better :-)
> > >>> I have some small concerns for option 2
> > >>>
> > >>> 1. Developers understand that the cost is relatively high.
> > >>> The merging of small files is very important, but from the
perspective
> > of
> > >>> sink as a whole, this requirement is only a special case of sink. We
> > >>> expose
> > >>> this requir
[message truncated...]

Reply via email to