Thanks for bringing this up, Liwei.

I think this is a good area to think about. Watermarks based on a
monotonically increasing column are definitely useful for driving
downstream applications and I think we should definitely build and document
patterns around how to use them.

My main objection to this proposal is that it is partition based. Data
partitioning can change and should never be used to drive downstream jobs.
Partitioning is a way to manage data within a table and we don't want to
make it part of a guarantee or contract across tables. The right way to
drive downstream processes is to use a watermark and allow the downstream
to select the data that's needed, regardless of the physical layout of the
table.

I think there are also specific parts of the proposal that are likely
misleading. The use of a _SUCCESS file, for example, is intended to carry
forward a Hive convention, but people should not treat Iceberg partitions
like Hive partitions by looking at the files underneath the table. If you
watch for a _SUCCESS file, are you also listing the directory to see what
is in the table? Seems like an easy mistake to make.

Instead of introducing a partition "commit" or anything partition based in
the API, I think we should formalize how to define watermarks for a column
and access them in snapshot summary metadata. Documenting that and then
creating patterns around it is going to be a lot better for users in the
long run.

Ryan

On Fri, Jun 16, 2023 at 1:30 AM Liwei Li <hilili...@gmail.com> wrote:

> Hi guys:
>
> We have drafted a proposal on the notification mechanism after Flink
> partition commit and would like to hear your suggestions. Could someone
> help to review if you are interested? Thank you for your assistance.
>
>
> Flink Support Partition Commit notification for append data
> After writing a partition, it is often necessary to notify downstream
> applications. [image: image.png]
>
>
> For example, there is a table partitioned by hour, when the watermark
> based on event time reaches 12:00:00 we think that the data of the 11 hour
> partition is completed and can be used by downstream applications, so we
> commit this partition to the metadata (create a new snapshot). Then, make a
> remote API call to notify the task flow that it can start the spark batch
> step for 11 hour partition.
>
> For the current default commit mode, which depends on Checkpoint, we can
> understand it as using process time to determine table partition commit.
> And the partition commit strategy in this PR, we can understand it as using
> event time to decide whether to commit table partitions.
>
>    -
>
>    Policy: What need to do when committing the partition, built-in
>    policies support for the commit of `default`(only commit to metadata)
>    and `success files`(for file like system, e.g. HDFS) and you can also
>    implement your own policies, standardized interfaces are provided so
>    that developers can easily carry out custom extension, such as merging
>    small files, remote api calls, etc.
>
> Compatibility
>
> This PR maintains compatibility with the Flink ecosystem.
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#writing
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#partition-commit
>
>
> NOTE: Partition Commit only works in dynamic partition inserting.
>
>
> Q:What happens when 10 partition's data arrives again after the 10
> partition has been committed?
>
> S:The message that both the 10 and 11 partitions are committed is sent to
> the receiver when 11 partition is about to be committed, and the receiver
> decides what action to take.
>
> The above is a partial introduction of the proposal. For the full
> document, please refer to:
> https://docs.google.com/document/d/1Sobv8XbvsyPzHi1YWy_jSet1Wy7smXKDKeQrNZSFYCg
>
>
> Thank you very much for the valuable suggestions from @Steven and @Junjie
> Chen.
>
> Thanks,
> Liwei Li
>
>

-- 
Ryan Blue
Tabular

Reply via email to