[
https://issues.apache.org/jira/browse/FLINK-21634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582885#comment-17582885
]
Jane Chan commented on FLINK-21634:
-----------------------------------
Sorry for being late, but I encountered a non-trivial problem when working on
FLINK-22315.
h4. TableSchema v.s. Schema andĀ CatalogTableImpl v.s. CatalogTable
Currently, all DDLs-related functionalities (such as create table, create table
like, alter table set/reset options, add/drop constraint, show create table
etc.) are based on the deprecated CatalogTableImpl and TableSchema. There is no
reason for FLINK-22315 to stick to the old framework, but that will introduce
the inconsistency impl for the original and the altered table, which will cause
new problems.
It is known that both TableSchema and Schema have some problems. E.g.
* TableColumn does not persist the column's comment, and thus `alter table add
comment` cannot be revealed in the updated DDL even if it's persisted in the
catalog using the new Column impl.
* After TableSchema is deprecated, the CatalogBaseTable#getSchema returns null
as default. Suppose the situation where
** First, create the table, and the catalog persists in the table, which is
implemented against the old CatalogTableImpl.
** And then, alter the table constraint/column, which updates the table as
CatalogTable.
** After some time, alter the table's options/drop constraints, but at this
time, the source table is already updated as CatalogTable impl, and getSchema
will return null, and NPE happens. Actually, this is causing CI failure
https://dev.azure.com/qingyuecqy/Flink/_build/results?buildId=102&view=logs&j=43a593e7-535d-554b-08cc-244368da36b4&t=82d122c0-8bbf-56f3-4c0d-8e3d69630d0f
So I suggest unifying the underlying implementation, and completely adapting to
the new framework before supporting this feature.Ā
> ALTER TABLE statement enhancement
> ---------------------------------
>
> Key: FLINK-21634
> URL: https://issues.apache.org/jira/browse/FLINK-21634
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API, Table SQL / Client
> Reporter: Jark Wu
> Assignee: Jark Wu
> Priority: Major
> Labels: auto-unassigned, stale-assigned
> Fix For: 1.16.0
>
>
> We already introduced ALTER TABLE statement in FLIP-69 [1], but only support
> to rename table name and change table options. One useful feature of ALTER
> TABLE statement is modifying schema. This is also heavily required by
> integration with data lakes (e.g. iceberg).
> Therefore, I propose to support following ALTER TABLE statements (except
> {{SET}} and {{{}RENAME TO{}}}, others are all new introduced syntax):
> {code:sql}
> ALTER TABLE table_name {
> ADD { <schema_component> | (<schema_component> [, ...]) }
> | MODIFY { <schema_component> | (<schema_component> [, ...]) }
> | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY |
> CONSTRAINT constraint_name | WATERMARK}
> | RENAME old_column_name TO new_column_name
> | RENAME TO new_table_name
> | SET (key1=val1, ...)
> | RESET (key1, ...)
> }
> <schema_component>::
> { <column_component> | <constraint_component> | <watermark_component> }
> <column_component>::
> column_name <column_definition> [FIRST | AFTER column_name]
> <constraint_component>::
> [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> <watermark_component>::
> WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
> <column_definition>::
> { <physical_column_definition> | <metadata_column_definition> |
> <computed_column_definition> } [COMMENT column_comment]
> <physical_column_definition>::
> column_type
> <metadata_column_definition>::
> column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
> <computed_column_definition>::
> AS computed_column_expression
> {code}
> And some examples:
> {code:sql}
> -- add a new column
> ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
> -- add columns, constraint, and watermark
> ALTER TABLE mytable ADD (
> log_ts STRING COMMENT 'log timestamp string' FIRST,
> ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
> PRIMARY KEY (id) NOT ENFORCED,
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- modify a column type
> ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes
> per second' AFTER `id`;
> -- modify definition of column log_ts and ts, primary key, watermark. they
> must exist in table schema
> ALTER TABLE mytable ADD (
> log_ts STRING COMMENT 'log timestamp string' AFTER `id`, -- reoder
> columns
> ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
> PRIMARY KEY (id) NOT ENFORCED,
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- drop an old column
> ALTER TABLE prod.db.sample DROP measurement;
> -- drop columns
> ALTER TABLE prod.db.sample DROP (col1, col2, col3);
> -- drop a watermark
> ALTER TABLE prod.db.sample DROP WATERMARK;
> -- rename column name
> ALTER TABLE prod.db.sample RENAME `data` TO payload;
> -- rename table name
> ALTER TABLE mytable RENAME TO mytable2;
> -- set options
> ALTER TABLE kafka_table SET (
> 'scan.startup.mode' = 'specific-offsets',
> 'scan.startup.specific-offsets' = 'partition:0,offset:42'
> );
> -- reset options
> ALTER TABLE kafka_table RESET ('scan.startup.mode',
> 'scan.startup.specific-offsets');
> {code}
> Note: we don't need to introduce new interfaces, because all the alter table
> operation will be forward to catalog through
> {{{}Catalog#alterTable(tablePath, newTable, ignoreIfNotExists){}}}.
> [1]:
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table]
> [2]: [http://iceberg.apache.org/spark-ddl/#alter-table-alter-column]
> [3]: [https://trino.io/docs/current/sql/alter-table.html]
> [4]: [https://dev.mysql.com/doc/refman/8.0/en/alter-table.html]
> [5]: [https://www.postgresql.org/docs/9.1/sql-altertable.html]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)