Hisoka-X commented on code in PR #6701:
URL: https://github.com/apache/seatunnel/pull/6701#discussion_r1562528690


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java:
##########
@@ -74,43 +78,86 @@ public TableChange getTableSchema(JdbcConnection jdbc, 
TableId tableId) {
     }
 
     private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
-        final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
-        final String sql = SHOW_CREATE_TABLE + MySqlUtils.quote(tableId);
+        Map<TableId, TableChange> tableChangeMap = new HashMap<>();
         try {
-            jdbc.query(
-                    sql,
-                    rs -> {
-                        if (rs.next()) {
-                            final String ddl = rs.getString(2);
-                            final MySqlOffsetContext offsetContext =
-                                    
MySqlOffsetContext.initial(connectorConfig);
-                            List<SchemaChangeEvent> schemaChangeEvents =
-                                    databaseSchema.parseSnapshotDdl(
-                                            ddl, tableId.catalog(), 
offsetContext, Instant.now());
-                            for (SchemaChangeEvent schemaChangeEvent : 
schemaChangeEvents) {
-                                for (TableChange tableChange :
-                                        schemaChangeEvent.getTableChanges()) {
-                                    Table table =
-                                            
CatalogTableUtils.mergeCatalogTableConfig(
-                                                    tableChange.getTable(), 
tableMap.get(tableId));
-                                    TableChange newTableChange =
-                                            new TableChange(
-                                                    
TableChanges.TableChangeType.CREATE, table);
-                                    tableChangeMap.put(tableId, 
newTableChange);
-                                }
-                            }
-                        }
-                    });
-        } catch (SQLException e) {
-            throw new RuntimeException(
-                    String.format("Failed to read schema for table %s by 
running %s", tableId, sql),
-                    e);
+            tableChangeMap = getTableSchemaByShowCreateTable(jdbc, tableId);
+            if (tableChangeMap.isEmpty()) {
+                log.debug("Load schema is empty for table {}", tableId);
+            }
+        } catch (Exception e) {
+            log.debug("Ignore exception when execute `SHOW CREATE TABLE {}` 
failed", tableId, e);
+        }
+        if (tableChangeMap.isEmpty()) {
+            try {
+                log.info("Fallback to use `DESC {}` load schema", tableId);
+                tableChangeMap = getTableSchemaByDescTable(jdbc, tableId);
+            } catch (SQLException ex) {
+                throw new SeaTunnelException(
+                        String.format("Failed to read schema for table %s by 
running %s", tableId),
+                        ex);
+            }
         }
         if (!tableChangeMap.containsKey(tableId)) {
             throw new RuntimeException(
-                    String.format("Can't obtain schema for table %s by running 
%s", tableId, sql));
+                    String.format("Can't obtain schema for table %s by running 
%s", tableId));

Review Comment:
   ```suggestion
                       String.format("Can't obtain schema for table %s", 
tableId));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to