wuwenchi opened a new pull request, #4625:
URL: https://github.com/apache/iceberg/pull/4625
We hope iceberg can support watermark and computed columns.
The specific implementation details are as follows:
# Background and Motivation
There is a temporal table in iceberg, which needs to be used for real-time
join with other tables, so a watermark needs to be defined. However, the
watermark has requirements for the attributes of the fields, and the fields
representing time in the source table may not meet this requirement, so it is
necessary to convert the time fields of the source table by recalculating or
calling functions.
**Why not directly use the iceberg connector table supported by flink?**
1. All flink tables are stored in memory. When the task ends, the table
disappears. If you want to use it next time, you need to create it again.
2. In the scenes we use, a temporal table needs to be joined with multiple
tables, and the attributes of the watermark are only related to the temporal
table, so the watermark attributes of these tasks are consistent, so we hope to
directly associate these attributes with Temporal table binding, when you use
this table again in the future, you don't need to go to other tasks to check
the watermark attribute setting parameters of the table.
# Goal
1. Iceberg supports flink-sql to set watermark and computed columns.
2. The watermark attribute and computed column attribute can be modified by
modifying the table properties.
# Proposal
## Table property save format
### Example
```sql
CREATE TABLE tl (
id INT,
id2 AS id * 2,
f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
t1 TIMESTAMP(6),
t2 AS cast(t1 AS TIMESTAMP(3)),
watermark FOR t2 AS t2 - INTERVAL '5' SECOND
);
```
Then the table properties are saved as:
```sql
flink.computed-columns.id2 = `id` * 2
flink.computed-columns.f1 = TO_TIMESTAMP(FROM_UNIXTIME(`id` * 3))
flink.computed-columns.t2 = CAST(`t1` AS TIMESTAMP(3))
flink.watermark.t2 = `t2` - INTERVAL '5' SECOND
```
### key format
fixed prefix + field name:
- fixed prefix for watermark: `flink.watermark.`
- fixed prefix for computed columns: `flink.computed-columns.`
### value format
defined expression from user.
## The way of addition, deletion and modification
1、DDL of flink-sql, only supports adding
```sql
CREATE TABLE `hive_catalog`.`default`.`sample` (
id INT,
id2 AS id * 2,
f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
t1 TIMESTAMP(6),
t2 AS cast(t1 AS TIMESTAMP(3)),
watermark FOR t2 AS t2 - INTERVAL '5' SECOND
);
```
2、Syntax of table properties
- add or update
```sql
ALTER TABLE `hive_catalog`.`default`.`sample` SET (
'flink.computed-columns.id2'='id*3'
)
```
- delete
```sql
ALTER TABLE `hive_catalog`.`default`.`sample` RESET (
'flink.computed-columns.id2'
)
```
## Solution
### add table process
1. If there is a defined computed column in the table, save its expression
to the table property.
2. If there is a defined watermark in the table, save its expression to the
table property.
### alter table properties process
1. Merge the modified properties with the original properties to get the
updated properties of the table.
2. Generate the table of flink from the merged table properties and the
schema of the original table, and verify the schema. (To prevent errors in
expression, such as writing a non-existing function or column name in the
expression of the computed column.)
3. If the verification in the previous step is successful, submit this
property modification,
if not, report exception and do not make the final submission.
### get table process
1. Generate the flink table by combining the current table properties with
the table schema, and verify the schema. (To prevent the schema of the table
from being modified by other engines, resulting in an error in the expression
of the calculated column. For example, in the expression of the computed
column: `id` * 3, but then the `id` column was deleted using spark, and the
corresponding property for computed column was not deleted.)
2. If the verification is successful, the table is returned.
If it is unsuccessful, the computed columns and watermarks in the table
properties are ignored, and the original table physical schema is returned
directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]