Thank you for your valuable feedback, Ryan.

My main objection to this proposal is that it is partition based. Data
> partitioning can change and should never be used to drive downstream jobs.
> Partitioning is a way to manage data within a table and we don't want to
> make it part of a guarantee or contract across tables. The right way to
> drive downstream processes is to use a watermark and allow the downstream
> to select the data that's needed, regardless of the physical layout of the
> table.

Previously, I raised PR #6253 <https://github.com/apache/iceberg/pull/6253>
that wrote the task's watermark to the snapshot summary. However, it does
not provide a downstream notification mechanism. Maybe we can combine it to
use the watermark instead of the partition to drive downstream
applications  or trigger some custom action.

For example, we define configs:

   - *Time unit*, to extract numerical values based on this unit from
   watermark
   - *Delay duration*, the time offset for initiating the next notification.

Let's say we want to trigger a notification every hour, We can set the time
unit to `hour` and the `delay durationto` to 1:

   1. When the flink task starts, retrieve the next *hour *(time unit) from
   the current watermark as the notification time,
   2. Extract the truncated hour value of the current watermark. When this
   value reaches the time to send notifications, trigger it.
   3. Increment the notification time by *1*(delay duration)  as the next
   notification time.

This example illustrates an approach, where it eliminates the dependence on
the physical layout of the table, and only decides whether to trigger an
event according to the watermark.

Besides the notification mechanism, when to commit the data to the table is
also an important point to consider. If a batch of data is not fully
arrived (the watermark reaches some specific thresholds), the user may not
want the data of that batch to be visible to the downstream. This part may
refer to the WAP mechanism in Spark. Only when the watermark reaches the
threshold, can the stage only snapshots be committed.

For the _SUCCESS file, as an built-in customizable trigger policy, I also
struggled with whether to include it at first. What you said makes a lot of
sense. We should let users avoid manipulating the underlying file system
directly, and use the metadata table instead. The existence of the _SUCCESS
file may only be to reduce the migration cost, but it is inconsistent with
the design philosophy of Iceberg. We should remove it.

I think whether it is sending notifications or committing snapshots, they
can be viewed as a watermark-based event mechanism, and allow developers to
customize extensions.


Thanks,
Liwei

On Sat, Jun 17, 2023 at 12:19 AM Ryan Blue <b...@tabular.io> wrote:

> Thanks for bringing this up, Liwei.
>
> I think this is a good area to think about. Watermarks based on a
> monotonically increasing column are definitely useful for driving
> downstream applications and I think we should definitely build and document
> patterns around how to use them.
>
> My main objection to this proposal is that it is partition based. Data
> partitioning can change and should never be used to drive downstream jobs.
> Partitioning is a way to manage data within a table and we don't want to
> make it part of a guarantee or contract across tables. The right way to
> drive downstream processes is to use a watermark and allow the downstream
> to select the data that's needed, regardless of the physical layout of the
> table.
>
> I think there are also specific parts of the proposal that are likely
> misleading. The use of a _SUCCESS file, for example, is intended to carry
> forward a Hive convention, but people should not treat Iceberg partitions
> like Hive partitions by looking at the files underneath the table. If you
> watch for a _SUCCESS file, are you also listing the directory to see what
> is in the table? Seems like an easy mistake to make.
>
> Instead of introducing a partition "commit" or anything partition based in
> the API, I think we should formalize how to define watermarks for a column
> and access them in snapshot summary metadata. Documenting that and then
> creating patterns around it is going to be a lot better for users in the
> long run.
>
> Ryan
>
> On Fri, Jun 16, 2023 at 1:30 AM Liwei Li <hilili...@gmail.com> wrote:
>
>> Hi guys:
>>
>> We have drafted a proposal on the notification mechanism after Flink
>> partition commit and would like to hear your suggestions. Could someone
>> help to review if you are interested? Thank you for your assistance.
>>
>>
>> Flink Support Partition Commit notification for append data
>> After writing a partition, it is often necessary to notify downstream
>> applications. [image: image.png]
>>
>>
>> For example, there is a table partitioned by hour, when the watermark
>> based on event time reaches 12:00:00 we think that the data of the 11 hour
>> partition is completed and can be used by downstream applications, so we
>> commit this partition to the metadata (create a new snapshot). Then, make a
>> remote API call to notify the task flow that it can start the spark batch
>> step for 11 hour partition.
>>
>> For the current default commit mode, which depends on Checkpoint, we can
>> understand it as using process time to determine table partition commit.
>> And the partition commit strategy in this PR, we can understand it as using
>> event time to decide whether to commit table partitions.
>>
>>    -
>>
>>    Policy: What need to do when committing the partition, built-in
>>    policies support for the commit of `default`(only commit to metadata)
>>    and `success files`(for file like system, e.g. HDFS) and you can also
>>    implement your own policies, standardized interfaces are provided so
>>    that developers can easily carry out custom extension, such as
>>    merging small files, remote api calls, etc.
>>
>> Compatibility
>>
>> This PR maintains compatibility with the Flink ecosystem.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#writing
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#partition-commit
>>
>>
>> NOTE: Partition Commit only works in dynamic partition inserting.
>>
>>
>> Q:What happens when 10 partition's data arrives again after the 10
>> partition has been committed?
>>
>> S:The message that both the 10 and 11 partitions are committed is sent to
>> the receiver when 11 partition is about to be committed, and the
>> receiver decides what action to take.
>>
>> The above is a partial introduction of the proposal. For the full
>> document, please refer to:
>> https://docs.google.com/document/d/1Sobv8XbvsyPzHi1YWy_jSet1Wy7smXKDKeQrNZSFYCg
>>
>>
>> Thank you very much for the valuable suggestions from @Steven and @Junjie
>> Chen.
>>
>> Thanks,
>> Liwei Li
>>
>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to