LadyForest commented on code in PR #21577:
URL: https://github.com/apache/flink/pull/21577#discussion_r1059352101
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
return new AddWatermark(watermarkSpec);
}
+ /**
+ * A table change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>change the computed expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are built in the {@link
Review Comment:
Nit: built in -> represented by
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
return new AddWatermark(watermarkSpec);
}
+ /**
+ * A table change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>change the computed expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are built in the {@link
+ * TableChange#modifyPhysicalColumnType}, {@link
TableChange#modifyColumnName}, {@link
+ * TableChange#modifyColumnComment} and {@link
TableChange#modifyColumnPosition}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition>
COMMENT '<column_comment>' <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newColumn the definition of the new column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumn modify(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition
columnPosition) {
+ return new ModifyColumn(oldColumn, newColumn, columnPosition);
+ }
+
+ /**
+ * A table change that modify the physical column data type.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<new_column_type>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newType the type of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn,
DataType newType) {
+ return new ModifyPhysicalColumnType(oldColumn, newType);
+ }
+
+ /**
+ * A table change to modify the column name.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RENAME <old_column_name> TO
<new_column_name>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newName the name of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnName modifyColumnName(Column oldColumn, String newName)
{
+ return new ModifyColumnName(oldColumn, newName);
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newComment the modified comment.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnComment modifyColumnComment(Column oldColumn, String
newComment) {
+ return new ModifyColumnComment(oldColumn, newComment);
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnPosition modifyColumnPosition(
+ Column oldColumn, ColumnPosition columnPosition) {
+ return new ModifyColumnPosition(oldColumn, columnPosition);
+ }
+
+ /**
+ * A table change to add a unique constraint.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY PRIMARY KEY
(<column_name>...) NOT ENFORCED;
+ * </pre>
+ *
+ * @param newConstraint the modified constraint definition.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyUniqueConstraint modify(UniqueConstraint newConstraint) {
+ return new ModifyUniqueConstraint(newConstraint);
+ }
+
+ /**
+ * A table change to add a watermark.
Review Comment:
modify?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -151,12 +157,35 @@ public static Operation convertChangeColumn(
TableSchema oldSchema = catalogTable.getSchema();
boolean first = changeColumn.isFirst();
String after = changeColumn.getAfter() == null ? null :
changeColumn.getAfter().getSimple();
- TableColumn newTableColumn =
toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+ TableColumn.PhysicalColumn newTableColumn =
+ toTableColumn(changeColumn.getNewColumn(), sqlValidator);
TableSchema newSchema = changeColumn(oldSchema, oldName,
newTableColumn, first, after);
Map<String, String> newProperties = new
HashMap<>(catalogTable.getOptions());
newProperties.putAll(extractProperties(changeColumn.getProperties()));
- return new AlterTableSchemaOperation(
+
+ List<TableChange> tableChanges =
+ buildColumnChange(
+ catalogTable
+ .getResolvedSchema()
+ .getColumn(oldName)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ "Failed to get old
column: " + oldName)),
+ Column.physical(newTableColumn.getName(),
newTableColumn.getType())
+ .withComment(
+ changeColumn
+ .getNewColumn()
Review Comment:
Add a util method?
```java
@Nullable
private static String getComment(SqlTableColumn column) {
return column.getComment()
.map(SqlCharStringLiteral.class::cast)
.map(c -> c.getValueAs(String.class))
.orElse(null);
}
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -276,6 +409,359 @@ public String toString() {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Modify Change
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A base schema change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>change the computed expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are defined in the {@link
ModifyPhysicalColumnType},
+ * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link
ModifyColumnName}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition>
COMMENT '<column_comment>' <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumn implements TableChange {
+
+ protected final Column oldColumn;
+ protected final Column newColumn;
+
+ protected final @Nullable ColumnPosition newPosition;
+
+ public ModifyColumn(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition
newPosition) {
+ this.oldColumn = oldColumn;
+ this.newColumn = newColumn;
+ this.newPosition = newPosition;
+ }
+
+ /** Returns the original {@link Column} instance. */
+ public Column getOldColumn() {
+ return oldColumn;
+ }
+
+ /** Returns the modified {@link Column} instance. */
+ public Column getNewColumn() {
+ return newColumn;
+ }
+
+ /**
+ * Returns the position of the modified {@link Column} instance. When
the return value is
+ * null, it means modify the column at the original position. When the
return value is
+ * FIRST, it means move the modified column to the first. When the
return value is AFTER, it
+ * means move the column after the referred column.
+ */
+ public @Nullable ColumnPosition getNewPosition() {
+ return newPosition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ModifyColumn)) {
+ return false;
+ }
+ ModifyColumn that = (ModifyColumn) o;
+ return Objects.equals(oldColumn, that.oldColumn)
+ && Objects.equals(newColumn, that.newColumn)
+ && Objects.equals(newPosition, that.newPosition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(oldColumn, newColumn, newPosition);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumn{"
+ + "oldColumn="
+ + oldColumn
+ + ", newColumn="
+ + newColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnComment extends ModifyColumn {
+
+ private final String newComment;
+
+ private ModifyColumnComment(Column oldColumn, String newComment) {
+ super(oldColumn, oldColumn.withComment(newComment), null);
+ this.newComment = newComment;
+ }
+
+ /** Get the new comment for the column. */
+ public String getNewComment() {
+ return newComment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnComment) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnComment{"
+ + "Column="
+ + oldColumn
+ + ", newComment='"
+ + newComment
+ + '\''
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnPosition extends ModifyColumn {
+
+ public ModifyColumnPosition(Column oldColumn, ColumnPosition
newPosition) {
+ super(oldColumn, oldColumn, newPosition);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnPosition) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnPosition{"
+ + "Column="
+ + oldColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change that modify the physical column data type.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<new_column_type>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyPhysicalColumnType extends ModifyColumn {
+
+ private ModifyPhysicalColumnType(Column oldColumn, DataType newType) {
+ super(oldColumn, oldColumn.copy(newType), null);
+ Preconditions.checkArgument(oldColumn.isPhysical());
+ }
+
+ /** Get the column type for the new column. */
+ public DataType getNewType() {
+ return newColumn.getDataType();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyPhysicalColumnType) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyPhysicalColumnType{"
+ + "Column="
+ + oldColumn
+ + ", newType="
+ + getNewType()
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column name.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RENAME <old_column_name> TO
<new_column_name>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnName extends ModifyColumn {
+
+ private ModifyColumnName(Column oldColumn, String newName) {
+ super(oldColumn, createNewColumn(oldColumn, newName), null);
+ }
+
+ private static Column createNewColumn(Column oldColumn, String
newName) {
+ if (oldColumn instanceof Column.PhysicalColumn) {
+ return Column.physical(newName, oldColumn.getDataType())
+ .withComment(oldColumn.comment);
+ } else if (oldColumn instanceof Column.MetadataColumn) {
+ Column.MetadataColumn metadataColumn = (Column.MetadataColumn)
oldColumn;
+ return Column.metadata(
+ newName,
+ oldColumn.getDataType(),
+ metadataColumn.getMetadataKey().orElse(null),
+ metadataColumn.isVirtual())
+ .withComment(oldColumn.comment);
+ } else {
+ return Column.computed(newName, ((Column.ComputedColumn)
oldColumn).getExpression())
+ .withComment(oldColumn.comment);
+ }
+ }
+
+ /** Returns the origin column name. */
+ public String getOldColumnName() {
+ return oldColumn.getName();
+ }
+
+ /** Returns the new column name after renaming the column name. */
+ public String getNewColumnName() {
+ return newColumn.getName();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnName) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnName{"
+ + "Column="
+ + oldColumn
+ + ", newName="
+ + getNewColumnName()
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify an unique constraint.
Review Comment:
Nit: an -> a
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -94,6 +96,137 @@ static AddWatermark add(WatermarkSpec watermarkSpec) {
return new AddWatermark(watermarkSpec);
}
+ /**
+ * A table change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>change the computed expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are built in the {@link
+ * TableChange#modifyPhysicalColumnType}, {@link
TableChange#modifyColumnName}, {@link
+ * TableChange#modifyColumnComment} and {@link
TableChange#modifyColumnPosition}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition>
COMMENT '<column_comment>' <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newColumn the definition of the new column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumn modify(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition
columnPosition) {
+ return new ModifyColumn(oldColumn, newColumn, columnPosition);
+ }
+
+ /**
+ * A table change that modify the physical column data type.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<new_column_type>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newType the type of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn,
DataType newType) {
+ return new ModifyPhysicalColumnType(oldColumn, newType);
+ }
+
+ /**
+ * A table change to modify the column name.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RENAME <old_column_name> TO
<new_column_name>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newName the name of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnName modifyColumnName(Column oldColumn, String newName)
{
+ return new ModifyColumnName(oldColumn, newName);
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newComment the modified comment.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnComment modifyColumnComment(Column oldColumn, String
newComment) {
+ return new ModifyColumnComment(oldColumn, newComment);
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnPosition modifyColumnPosition(
+ Column oldColumn, ColumnPosition columnPosition) {
+ return new ModifyColumnPosition(oldColumn, columnPosition);
+ }
+
+ /**
+ * A table change to add a unique constraint.
Review Comment:
found a typo here: add -> modify
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -698,7 +782,7 @@ private AlterSchemaStrategy
computeAlterSchemaStrategy(SqlAlterTableSchema alter
alterTableSchema.getClass().getCanonicalName()));
}
- private static <T> T unwrap(Optional<T> value) {
+ private <T> T unwrap(Optional<T> value) {
Review Comment:
Why change this?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -276,6 +409,359 @@ public String toString() {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Modify Change
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A base schema change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>change the computed expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are defined in the {@link
ModifyPhysicalColumnType},
+ * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link
ModifyColumnName}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition>
COMMENT '<column_comment>' <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumn implements TableChange {
+
+ protected final Column oldColumn;
+ protected final Column newColumn;
+
+ protected final @Nullable ColumnPosition newPosition;
+
+ public ModifyColumn(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition
newPosition) {
+ this.oldColumn = oldColumn;
+ this.newColumn = newColumn;
+ this.newPosition = newPosition;
+ }
+
+ /** Returns the original {@link Column} instance. */
+ public Column getOldColumn() {
+ return oldColumn;
+ }
+
+ /** Returns the modified {@link Column} instance. */
+ public Column getNewColumn() {
+ return newColumn;
+ }
+
+ /**
+ * Returns the position of the modified {@link Column} instance. When
the return value is
+ * null, it means modify the column at the original position. When the
return value is
+ * FIRST, it means move the modified column to the first. When the
return value is AFTER, it
+ * means move the column after the referred column.
+ */
+ public @Nullable ColumnPosition getNewPosition() {
+ return newPosition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ModifyColumn)) {
+ return false;
+ }
+ ModifyColumn that = (ModifyColumn) o;
+ return Objects.equals(oldColumn, that.oldColumn)
+ && Objects.equals(newColumn, that.newColumn)
+ && Objects.equals(newPosition, that.newPosition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(oldColumn, newColumn, newPosition);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumn{"
+ + "oldColumn="
+ + oldColumn
+ + ", newColumn="
+ + newColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnComment extends ModifyColumn {
+
+ private final String newComment;
+
+ private ModifyColumnComment(Column oldColumn, String newComment) {
+ super(oldColumn, oldColumn.withComment(newComment), null);
+ this.newComment = newComment;
+ }
+
+ /** Get the new comment for the column. */
+ public String getNewComment() {
+ return newComment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnComment) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnComment{"
+ + "Column="
+ + oldColumn
+ + ", newComment='"
+ + newComment
+ + '\''
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name>
<original_column_type> <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnPosition extends ModifyColumn {
+
+ public ModifyColumnPosition(Column oldColumn, ColumnPosition
newPosition) {
+ super(oldColumn, oldColumn, newPosition);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnPosition) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnPosition{"
+ + "Column="
+ + oldColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change that modify the physical column data type.
Review Comment:
Do we need to support modifying the metadata column type as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]