Thank you for your valuable feedback, Ryan. My main objection to this proposal is that it is partition based. Data > partitioning can change and should never be used to drive downstream jobs. > Partitioning is a way to manage data within a table and we don't want to > make it part of a guarantee or contract across tables. The right way to > drive downstream processes is to use a watermark and allow the downstream > to select the data that's needed, regardless of the physical layout of the > table.
Previously, I raised PR #6253 <https://github.com/apache/iceberg/pull/6253> that wrote the task's watermark to the snapshot summary. However, it does not provide a downstream notification mechanism. Maybe we can combine it to use the watermark instead of the partition to drive downstream applications or trigger some custom action. For example, we define configs: - *Time unit*, to extract numerical values based on this unit from watermark - *Delay duration*, the time offset for initiating the next notification. Let's say we want to trigger a notification every hour, We can set the time unit to `hour` and the `delay durationto` to 1: 1. When the flink task starts, retrieve the next *hour *(time unit) from the current watermark as the notification time, 2. Extract the truncated hour value of the current watermark. When this value reaches the time to send notifications, trigger it. 3. Increment the notification time by *1*(delay duration) as the next notification time. This example illustrates an approach, where it eliminates the dependence on the physical layout of the table, and only decides whether to trigger an event according to the watermark. Besides the notification mechanism, when to commit the data to the table is also an important point to consider. If a batch of data is not fully arrived (the watermark reaches some specific thresholds), the user may not want the data of that batch to be visible to the downstream. This part may refer to the WAP mechanism in Spark. Only when the watermark reaches the threshold, can the stage only snapshots be committed. For the _SUCCESS file, as an built-in customizable trigger policy, I also struggled with whether to include it at first. What you said makes a lot of sense. We should let users avoid manipulating the underlying file system directly, and use the metadata table instead. The existence of the _SUCCESS file may only be to reduce the migration cost, but it is inconsistent with the design philosophy of Iceberg. We should remove it. I think whether it is sending notifications or committing snapshots, they can be viewed as a watermark-based event mechanism, and allow developers to customize extensions. Thanks, Liwei On Sat, Jun 17, 2023 at 12:19 AM Ryan Blue <b...@tabular.io> wrote: > Thanks for bringing this up, Liwei. > > I think this is a good area to think about. Watermarks based on a > monotonically increasing column are definitely useful for driving > downstream applications and I think we should definitely build and document > patterns around how to use them. > > My main objection to this proposal is that it is partition based. Data > partitioning can change and should never be used to drive downstream jobs. > Partitioning is a way to manage data within a table and we don't want to > make it part of a guarantee or contract across tables. The right way to > drive downstream processes is to use a watermark and allow the downstream > to select the data that's needed, regardless of the physical layout of the > table. > > I think there are also specific parts of the proposal that are likely > misleading. The use of a _SUCCESS file, for example, is intended to carry > forward a Hive convention, but people should not treat Iceberg partitions > like Hive partitions by looking at the files underneath the table. If you > watch for a _SUCCESS file, are you also listing the directory to see what > is in the table? Seems like an easy mistake to make. > > Instead of introducing a partition "commit" or anything partition based in > the API, I think we should formalize how to define watermarks for a column > and access them in snapshot summary metadata. Documenting that and then > creating patterns around it is going to be a lot better for users in the > long run. > > Ryan > > On Fri, Jun 16, 2023 at 1:30 AM Liwei Li <hilili...@gmail.com> wrote: > >> Hi guys: >> >> We have drafted a proposal on the notification mechanism after Flink >> partition commit and would like to hear your suggestions. Could someone >> help to review if you are interested? Thank you for your assistance. >> >> >> Flink Support Partition Commit notification for append data >> After writing a partition, it is often necessary to notify downstream >> applications. [image: image.png] >> >> >> For example, there is a table partitioned by hour, when the watermark >> based on event time reaches 12:00:00 we think that the data of the 11 hour >> partition is completed and can be used by downstream applications, so we >> commit this partition to the metadata (create a new snapshot). Then, make a >> remote API call to notify the task flow that it can start the spark batch >> step for 11 hour partition. >> >> For the current default commit mode, which depends on Checkpoint, we can >> understand it as using process time to determine table partition commit. >> And the partition commit strategy in this PR, we can understand it as using >> event time to decide whether to commit table partitions. >> >> - >> >> Policy: What need to do when committing the partition, built-in >> policies support for the commit of `default`(only commit to metadata) >> and `success files`(for file like system, e.g. HDFS) and you can also >> implement your own policies, standardized interfaces are provided so >> that developers can easily carry out custom extension, such as >> merging small files, remote api calls, etc. >> >> Compatibility >> >> This PR maintains compatibility with the Flink ecosystem. >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#writing >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/#partition-commit >> >> >> NOTE: Partition Commit only works in dynamic partition inserting. >> >> >> Q:What happens when 10 partition's data arrives again after the 10 >> partition has been committed? >> >> S:The message that both the 10 and 11 partitions are committed is sent to >> the receiver when 11 partition is about to be committed, and the >> receiver decides what action to take. >> >> The above is a partial introduction of the proposal. For the full >> document, please refer to: >> https://docs.google.com/document/d/1Sobv8XbvsyPzHi1YWy_jSet1Wy7smXKDKeQrNZSFYCg >> >> >> Thank you very much for the valuable suggestions from @Steven and @Junjie >> Chen. >> >> Thanks, >> Liwei Li >> >> > > -- > Ryan Blue > Tabular >