Copilot commented on code in PR #3893:
URL: https://github.com/apache/flink-cdc/pull/3893#discussion_r2720418299


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterTableCommentEvent.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.event;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.utils.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * A {@link SchemaChangeEvent} that represents an {@code ALTER TABLE COMMENT} 
or {@code ALTER TABLE
+ * SET COMMENT} DDL.
+ */
+@PublicEvolving
+public class AlterTableCommentEvent implements SchemaChangeEvent {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableId tableId;
+    private final String comment;
+
+    public AlterTableCommentEvent(TableId tableId, String comment) {
+        Preconditions.checkArgument(comment != null, "comment should not be 
empty.");

Review Comment:
   The error message says "comment should not be empty" but the check is for 
`comment != null`. The message is misleading because a null comment and an 
empty comment are different. The message should be "comment should not be null" 
or the check should also verify emptiness with `!comment.isEmpty()`.
   ```suggestion
           Preconditions.checkArgument(comment != null, "comment should not be 
null.");
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java:
##########
@@ -247,6 +250,26 @@ public static void truncateTable(MaxComputeOptions 
options, TableId tableId)
         table.truncate();
     }
 
+    public static void alterTableComment(MaxComputeOptions options, TableId 
tableId, String comment)
+            throws OdpsException {
+        Odps odps = MaxComputeUtils.getOdps(options);
+        String sql =
+                "alter table "
+                        + getFullTableName(options, tableId)
+                        + " set comment '"
+                        + comment
+                        + "';";

Review Comment:
   The comment string is directly interpolated into the SQL statement without 
proper escaping at line 260. This could lead to SQL injection vulnerabilities 
or SQL syntax errors if the comment contains special characters like single 
quotes. The comment should be properly escaped before being added to the SQL 
string.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java:
##########
@@ -242,7 +242,11 @@ public void alterDropColumns(String databaseName, String 
tableName, List<String>
 
     @Override
     public void alterColumnType(
-            String databaseName, String tableName, String columnName, DataType 
dataType) {
+            String databaseName,
+            String tableName,
+            String columnName,
+            DataType dataType,
+            String comment) {

Review Comment:
   The `comment` parameter is accepted by this method signature but is never 
used in the implementation. The ALTER_COLUMN_TYPE_DDL template does not include 
a comment placeholder. If column comments should be supported during column 
type alterations, the SQL generation should be updated to include the comment. 
If comments are intentionally not supported for this operation in OceanBase 
MySQL, this should be documented or the parameter usage should be clarified.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java:
##########
@@ -385,6 +389,32 @@ public void truncateTable(String databaseName, String 
tableName) {
         }
     }
 
+    @Override
+    public void alterTable(String schemaName, String tableName, String 
comment) {
+        String alterTableDDL =
+                String.format(
+                        "ALTER TABLE `%s`.`%s` SET COMMENT '%s'", schemaName, 
tableName, comment);

Review Comment:
   The comment string is directly interpolated into the SQL statement without 
proper escaping. This could lead to SQL injection vulnerabilities or SQL syntax 
errors if the comment contains special characters like single quotes. The 
comment should be escaped using a proper escaping mechanism, or the `quote()` 
method should be extended to handle string values, not just identifiers.
   ```suggestion
           String escapedComment = comment == null ? "" : comment.replace("'", 
"''");
           String alterTableDDL =
                   String.format(
                           "ALTER TABLE `%s`.`%s` SET COMMENT '%s'",
                           schemaName,
                           tableName,
                           escapedComment);
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java:
##########
@@ -155,27 +155,30 @@ public static void addColumns(
      * 'col_comment'';
      */
     public static void alterColumnType(
-            MaxComputeOptions options, TableId tableId, Map<String, DataType> 
typeMapping)
+            MaxComputeOptions options,
+            TableId tableId,
+            Map<String, DataType> typeMapping,
+            Map<String, String> comments)
             throws OdpsException {
         Odps odps = MaxComputeUtils.getOdps(options);
 
         String prefix = "alter table " + getFullTableName(options, tableId) + 
" change column ";
 
         for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
             String alterColumnSql =
-                    prefix
-                            + entry.getKey()
-                            + " "
-                            + entry.getKey()
-                            + " "
-                            + string(entry.getValue())
-                            + ";";
+                    prefix + entry.getKey() + " " + entry.getKey() + " " + 
string(entry.getValue());
+            String comment = comments.get(entry.getKey());
+            if (comment == null) {
+                alterColumnSql += ";";
+            } else {
+                alterColumnSql += " comment '" + comment + "';";
+            }

Review Comment:
   The comment string is directly interpolated into the SQL statement without 
proper escaping at line 174. This could lead to SQL injection vulnerabilities 
or SQL syntax errors if the comment contains special characters like single 
quotes. The comment should be properly escaped before being added to the SQL 
string.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java:
##########
@@ -44,4 +44,20 @@ public boolean dropTable(String databaseName, String 
tableName)
                 "DROP TABLE " + identifier(databaseName) + "." + 
identifier(tableName);
         return this.execute(dropTableDDL, databaseName);
     }
+
+    public boolean alterTableComment(String databaseName, String tableName, 
String comment)
+            throws IOException, IllegalArgumentException {
+        String alterTableCommentDDL =
+                "ALTER TABLE "
+                        + identifier(databaseName)
+                        + "."
+                        + identifier(tableName)
+                        + " MODIFY COMMENT "
+                        + quoted(comment);
+        return this.execute(alterTableCommentDDL, databaseName);
+    }
+
+    private String quoted(String str) {
+        return "\"" + str + "\"";

Review Comment:
   The `quoted()` method does not escape quotes within the string. If the 
comment contains double quotes, it will break the SQL syntax or potentially 
allow SQL injection. The string should be properly escaped before being quoted, 
for example by escaping any internal double quotes with backslashes or doubling 
them depending on the database's escaping rules.
   ```suggestion
           String escaped = str.replace("\"", "\\\"");
           return "\"" + escaped + "\"";
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -362,9 +364,11 @@ public void 
exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
                     }
 
                     Map<String, DataType> typeMapping = new HashMap<>();
-
-                    typeMapping.put(oldColumnName, fromDbzColumn(column, 
tinyInt1isBit));
-                    changes.add(new AlterColumnTypeEvent(currentTable, 
typeMapping));
+                    typeMapping.put(column.name(), fromDbzColumn(column, 
tinyInt1isBit));
+                    AlterColumnTypeEvent alterColumnTypeEvent =
+                            new AlterColumnTypeEvent(currentTable, 
typeMapping);
+                    alterColumnTypeEvent.addColumnComment(column.name(), 
column.comment());

Review Comment:
   The column name used for the typeMapping key and the comment should be 
consistent. Currently, line 367 uses `column.name()` as the key for 
typeMapping, but line 357-360 computes `oldColumnName` which may be a 
lowercased version of `column.name()` when `isTableIdCaseInsensitive` is true. 
The typeMapping key should use `oldColumnName` instead of `column.name()` to 
ensure consistency with the rename logic on line 375 that also uses 
`oldColumnName`.
   ```suggestion
                       typeMapping.put(oldColumnName, fromDbzColumn(column, 
tinyInt1isBit));
                       AlterColumnTypeEvent alterColumnTypeEvent =
                               new AlterColumnTypeEvent(currentTable, 
typeMapping);
                       alterColumnTypeEvent.addColumnComment(oldColumnName, 
column.comment());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to