lvyanquan commented on code in PR #3579:
URL: https://github.com/apache/flink-cdc/pull/3579#discussion_r1734076317


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########
@@ -648,6 +648,20 @@ public void testSchemaChangeEvents() throws Exception {
                                     .physicalColumn("timestamp6_c", 
DataTypes.TIMESTAMP_LTZ(6))
                                     .primaryKey("id")
                                     .build()));
+
+            // Test create table DDL with inline primary key
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL 
PRIMARY KEY);",

Review Comment:
   Further, we can add one more test case like:
   ```
   CREATE TABLE my_table (
       id INT,
       name VARCHAR(255),
       PRIMARY KEY (id, name)
   );
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -113,14 +113,17 @@ public void 
exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
                                     .collect(Collectors.toList()));
                     
parser.databaseTables().overwriteTable(tableEditor.create());
                     parser.signalCreateTable(tableEditor.tableId(), ctx);
+
+                    Schema.Builder builder = Schema.newBuilder();
+                    tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
+                    if (tableEditor.hasPrimaryKey()) {
+                        
builder.primaryKey(tableEditor.primaryKeyColumnNames());
+                    }
+                    changes.add(
+                            new CreateTableEvent(
+                                    toCdcTableId(tableEditor.tableId()), 
builder.build()));
                 },
                 tableEditor);
-        Schema.Builder builder = Schema.newBuilder();
-        tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
-        if (tableEditor.hasPrimaryKey()) {
-            builder.primaryKey(tableEditor.primaryKeyColumnNames());
-        }
-        changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()), 
builder.build()));

Review Comment:
   Is this change necessary or for code style standardization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to