Copilot commented on code in PR #3893: URL: https://github.com/apache/flink-cdc/pull/3893#discussion_r2720418299
########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.utils.Preconditions; + +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT} or {@code ALTER TABLE + * SET COMMENT} DDL. + */ +@PublicEvolving +public class AlterTableCommentEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + private final String comment; + + public AlterTableCommentEvent(TableId tableId, String comment) { + Preconditions.checkArgument(comment != null, "comment should not be empty."); Review Comment: The error message says "comment should not be empty" but the check is for `comment != null`. The message is misleading because a null comment and an empty comment are different. The message should be "comment should not be null" or the check should also verify emptiness with `!comment.isEmpty()`. ```suggestion Preconditions.checkArgument(comment != null, "comment should not be null."); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java: ########## @@ -247,6 +250,26 @@ public static void truncateTable(MaxComputeOptions options, TableId tableId) table.truncate(); } + public static void alterTableComment(MaxComputeOptions options, TableId tableId, String comment) + throws OdpsException { + Odps odps = MaxComputeUtils.getOdps(options); + String sql = + "alter table " + + getFullTableName(options, tableId) + + " set comment '" + + comment + + "';"; Review Comment: The comment string is directly interpolated into the SQL statement without proper escaping at line 260. This could lead to SQL injection vulnerabilities or SQL syntax errors if the comment contains special characters like single quotes. The comment should be properly escaped before being added to the SQL string. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java: ########## @@ -242,7 +242,11 @@ public void alterDropColumns(String databaseName, String tableName, List<String> @Override public void alterColumnType( - String databaseName, String tableName, String columnName, DataType dataType) { + String databaseName, + String tableName, + String columnName, + DataType dataType, + String comment) { Review Comment: The `comment` parameter is accepted by this method signature but is never used in the implementation. The ALTER_COLUMN_TYPE_DDL template does not include a comment placeholder. If column comments should be supported during column type alterations, the SQL generation should be updated to include the comment. If comments are intentionally not supported for this operation in OceanBase MySQL, this should be documented or the parameter usage should be clarified. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java: ########## @@ -385,6 +389,32 @@ public void truncateTable(String databaseName, String tableName) { } } + @Override + public void alterTable(String schemaName, String tableName, String comment) { + String alterTableDDL = + String.format( + "ALTER TABLE `%s`.`%s` SET COMMENT '%s'", schemaName, tableName, comment); Review Comment: The comment string is directly interpolated into the SQL statement without proper escaping. This could lead to SQL injection vulnerabilities or SQL syntax errors if the comment contains special characters like single quotes. The comment should be escaped using a proper escaping mechanism, or the `quote()` method should be extended to handle string values, not just identifiers. ```suggestion String escapedComment = comment == null ? "" : comment.replace("'", "''"); String alterTableDDL = String.format( "ALTER TABLE `%s`.`%s` SET COMMENT '%s'", schemaName, tableName, escapedComment); ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java: ########## @@ -155,27 +155,30 @@ public static void addColumns( * 'col_comment''; */ public static void alterColumnType( - MaxComputeOptions options, TableId tableId, Map<String, DataType> typeMapping) + MaxComputeOptions options, + TableId tableId, + Map<String, DataType> typeMapping, + Map<String, String> comments) throws OdpsException { Odps odps = MaxComputeUtils.getOdps(options); String prefix = "alter table " + getFullTableName(options, tableId) + " change column "; for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) { String alterColumnSql = - prefix - + entry.getKey() - + " " - + entry.getKey() - + " " - + string(entry.getValue()) - + ";"; + prefix + entry.getKey() + " " + entry.getKey() + " " + string(entry.getValue()); + String comment = comments.get(entry.getKey()); + if (comment == null) { + alterColumnSql += ";"; + } else { + alterColumnSql += " comment '" + comment + "';"; + } Review Comment: The comment string is directly interpolated into the SQL statement without proper escaping at line 174. This could lead to SQL injection vulnerabilities or SQL syntax errors if the comment contains special characters like single quotes. The comment should be properly escaped before being added to the SQL string. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java: ########## @@ -44,4 +44,20 @@ public boolean dropTable(String databaseName, String tableName) "DROP TABLE " + identifier(databaseName) + "." + identifier(tableName); return this.execute(dropTableDDL, databaseName); } + + public boolean alterTableComment(String databaseName, String tableName, String comment) + throws IOException, IllegalArgumentException { + String alterTableCommentDDL = + "ALTER TABLE " + + identifier(databaseName) + + "." + + identifier(tableName) + + " MODIFY COMMENT " + + quoted(comment); + return this.execute(alterTableCommentDDL, databaseName); + } + + private String quoted(String str) { + return "\"" + str + "\""; Review Comment: The `quoted()` method does not escape quotes within the string. If the comment contains double quotes, it will break the SQL syntax or potentially allow SQL injection. The string should be properly escaped before being quoted, for example by escaping any internal double quotes with backslashes or doubling them depending on the database's escaping rules. ```suggestion String escaped = str.replace("\"", "\\\""); return "\"" + escaped + "\""; ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java: ########## @@ -362,9 +364,11 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) } Map<String, DataType> typeMapping = new HashMap<>(); - - typeMapping.put(oldColumnName, fromDbzColumn(column, tinyInt1isBit)); - changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(currentTable, typeMapping); + alterColumnTypeEvent.addColumnComment(column.name(), column.comment()); Review Comment: The column name used for the typeMapping key and the comment should be consistent. Currently, line 367 uses `column.name()` as the key for typeMapping, but line 357-360 computes `oldColumnName` which may be a lowercased version of `column.name()` when `isTableIdCaseInsensitive` is true. The typeMapping key should use `oldColumnName` instead of `column.name()` to ensure consistency with the rename logic on line 375 that also uses `oldColumnName`. ```suggestion typeMapping.put(oldColumnName, fromDbzColumn(column, tinyInt1isBit)); AlterColumnTypeEvent alterColumnTypeEvent = new AlterColumnTypeEvent(currentTable, typeMapping); alterColumnTypeEvent.addColumnComment(oldColumnName, column.comment()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
