wuwenchi opened a new pull request, #4625:
URL: https://github.com/apache/iceberg/pull/4625

   We hope iceberg can support watermark and computed columns.
   The specific implementation details are as follows:
   
   
   # Background and Motivation
   
   There is a temporal table in iceberg, which needs to be used for real-time 
join with other tables, so a watermark needs to be defined. However, the 
watermark has requirements for the attributes of the fields, and the fields 
representing time in the source table may not meet this requirement, so it is 
necessary to convert the time fields of the source table by recalculating or 
calling functions.
   
   **Why not directly use the iceberg connector table supported by flink?**
   
   1. All flink tables are stored in memory. When the task ends, the table 
disappears. If you want to use it next time, you need to create it again.
   2. In the scenes we use, a temporal table needs to be joined with multiple 
tables, and the attributes of the watermark are only related to the temporal 
table, so the watermark attributes of these tasks are consistent, so we hope to 
directly associate these attributes with Temporal table binding, when you use 
this table again in the future, you don't need to go to other tasks to check 
the watermark attribute setting parameters of the table.
   
   # Goal
   
   1. Iceberg supports flink-sql to set watermark and computed columns.
   2. The watermark attribute and computed column attribute can be modified by 
modifying the table properties.
   
   # Proposal
   
   ## Table property save format
   
   ### Example
   
   ```sql
   CREATE TABLE tl (
     id INT, 
     id2 AS id * 2, 
     f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
     t1 TIMESTAMP(6), 
     t2 AS cast(t1 AS TIMESTAMP(3)), 
     watermark FOR t2 AS t2 - INTERVAL '5' SECOND
   );
   ```
   
   Then the table properties are saved as:
   
   ```sql
   flink.computed-columns.id2 = `id` * 2
   flink.computed-columns.f1 = TO_TIMESTAMP(FROM_UNIXTIME(`id` * 3))
   flink.computed-columns.t2 = CAST(`t1` AS TIMESTAMP(3))
   flink.watermark.t2 = `t2` - INTERVAL '5' SECOND
   ```
   
   ### key format
   
   fixed prefix + field name:
   
   - fixed prefix for watermark: `flink.watermark.`
   - fixed prefix  for computed columns: `flink.computed-columns.`
   
   ### value format
   
   defined expression from user.
   
   ## The way of addition, deletion and modification
   
   1、DDL of flink-sql, only supports adding
   
   ```sql
   CREATE TABLE `hive_catalog`.`default`.`sample` (
     id INT, 
     id2 AS id * 2, 
     f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
     t1 TIMESTAMP(6), 
     t2 AS cast(t1 AS TIMESTAMP(3)), 
     watermark FOR t2 AS t2 - INTERVAL '5' SECOND
   );
   ```
   
   2、Syntax of table properties
   
   - add or update
   
   ```sql
   ALTER TABLE `hive_catalog`.`default`.`sample` SET (
        'flink.computed-columns.id2'='id*3'
   )
   ```
   
   - delete
   
   ```sql
   ALTER TABLE `hive_catalog`.`default`.`sample` RESET (
        'flink.computed-columns.id2'
   )
   ```
   
   ## Solution
   
   ### add table process
   
   1. If there is a defined computed column in the table, save its expression 
to the table property.
   2. If there is a defined watermark in the table, save its expression to the 
table property.
   
   ### alter table properties process
   
   1. Merge the modified properties with the original properties to get the 
updated properties of the table.
   
   2. Generate the table of flink from the merged table properties and the 
schema of the original table, and verify the schema. (To prevent errors in 
expression, such as writing a non-existing function or column name in the 
expression of the computed column.)
   
   3. If the verification in the previous step is successful, submit this 
property modification, 
      if not, report exception and do not make the final submission.
   
   ### get table process
   
   1. Generate the flink table by combining the current table properties with 
the table schema, and verify the schema. (To prevent the schema of the table 
from being modified by other engines, resulting in an error in the expression 
of the calculated column. For example, in the expression of the computed 
column: `id` * 3, but then the `id` column was deleted using spark, and the 
corresponding property for computed column was not deleted.)
   
   2. If the verification is successful, the table is returned. 
      If it is unsuccessful, the computed columns and watermarks in the table 
properties are ignored, and the original table physical schema is returned 
directly.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to