Ivan-gfan opened a new issue, #6799:
URL: https://github.com/apache/seatunnel/issues/6799

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When I synchronized from MySQL to MySQL, fieldMapper of transform was added, 
and when `schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"`was configured at 
the sink, an error occurred in the automatic table creation function, and all 
field types were null.
   
   The following DDL statements are viewed in the log
   
   ```text
   2024-05-06 18:48:28,434 INFO  
org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table 
awakening_earth_web.awakening_earth_web.testCreate with action CREATE TABLE 
`testCreate` (
        `id` null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名称', 
        `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
        `type` null NOT NULL COMMENT '数据源类型', 
        `configuration` null NOT NULL COMMENT '数据源连接配置', 
        `description` null NULL COMMENT '数据源描述', 
        `icon` null NULL COMMENT '图标', 
        PRIMARY KEY (`id`)
   ) COMMENT = '';
   2024-05-06 18:48:29,499 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - 
Execute sql : CREATE TABLE `testCreate` (
        `id` null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名称', 
        `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
        `type` null NOT NULL COMMENT '数据源类型', 
        `configuration` null NOT NULL COMMENT '数据源连接配置', 
        `description` null NULL COMMENT '数据源描述', 
        `icon` null NULL COMMENT '图标', 
        PRIMARY KEY (`id`)
   ) COMMENT = '';
   ```
   
   I traced the code and found that there was a judgment in the 
`MysqlCreateTableSqlBuilder` class to determine if the previous catalogname was 
the current mysql, and if so, get the column.getSourceType, but the sourceType 
attribute is null, which causes this error
   ```java
   private String buildColumnIdentifySql(Column column, String catalogName) {
           final List<String> columnSqls = new ArrayList<>();
           columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "`"));
           boolean isSupportDef = true;
   
           if ((SqlType.TIME.equals(column.getDataType().getSqlType())
                           || 
SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
                   && column.getScale() != null) {
               BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
               columnSqls.add(typeDefine.getColumnType());
           } else if (StringUtils.equals(catalogName, 
DatabaseIdentifier.MYSQL)) {
               columnSqls.add(column.getSourceType());
           } else {
               BasicTypeDefine<MysqlType> typeDefine = 
typeConverter.reconvert(column);
               columnSqls.add(typeDefine.getColumnType());
           }
           // nullable
           if (column.isNullable()) {
               columnSqls.add("NULL");
           } else {
               columnSqls.add("NOT NULL");
           }
   
           if (column.getComment() != null) {
               columnSqls.add("COMMENT '" + column.getComment() + "'");
           }
   
           return String.join(" ", columnSqls);
       }
   ```
   
   I tracked the code further and found that when sink was built(Class 
`MultipleTableJobConfigParser#parseSink`), there was a `tableWithActionMap` 
parameter, which stored the field structure of the source side and the field 
structure of transform, and the sourceType of the field structure of transform 
was null
   
   ```java
   ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
           String factoryId = getFactoryId(readonlyConfig);
           List<String> inputIds = getInputIds(readonlyConfig);
   
           List<List<Tuple2<CatalogTable, Action>>> inputVertices =
                   inputIds.stream()
                           .map(tableWithActionMap::get)
                           .filter(Objects::nonNull)
                           .collect(Collectors.toList());
           if (inputVertices.isEmpty()) {
               // Tolerates incorrect configuration of simple graph
               inputVertices = 
Collections.singletonList(findLast(tableWithActionMap));
           } else if (inputVertices.size() > 1) {
               for (List<Tuple2<CatalogTable, Action>> inputVertex : 
inputVertices) {
                   if (inputVertex.size() > 1) {
                       throw new JobDefineCheckException(
                               "Sink don't support simultaneous writing of data 
from multi-table source and other sources.");
                   }
               }
           }
   ```
   
   It is not clear for the time being whether such a result should be obtained. 
Now I have only tracked it here, please help to see where the problem occurred
   
   From this point of view, this error should not occur only from mysql to 
mysql, but if the source side and the target side are of the same type, and 
there is a transform fieldmapper, the same error will occur
   
   
   
   ### SeaTunnel Version
   
   `2.3.4`
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "BATCH"
   }
   
   source{
       Jdbc {
           url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
           driver = "com.mysql.cj.jdbc.Driver"
           connection_check_timeout_sec = 100
           user = "root"
           password = "root"
           database = "test"
           table_list = [
               {
                   table_path = "test.source"
               }
           ]
           result_table_name = "test1"
       }
   }
   
   transform {
     FieldMapper {
       source_table_name = "test1"
       result_table_name = "test2"
       field_mapper = {
           id = id
           name = name
           plugin_name = plugin_name_test
           type = type
           configuration = configuration
           description = description
           icon = icon
       }
     }
   }
   
   sink {
       jdbc {
           url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "root"
           password = "root"
           database = "test"
           table = "test.testCreate"
           generate_sink_sql = true
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
           data_save_mode="APPEND_DATA"
           source_table_name = "test2"
       }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   The cluster mode is used to run the startup class under the `example-module` 
in the source code
   ```
   
   
   ### Error Exception
   
   ```log
   2024-05-06 18:48:27,337 INFO  
org.apache.seatunnel.api.sink.SaveModeExecuteWrapper - Executing save mode for 
table: awakening_earth_web.awakening_earth_web.testCreate, with SchemaSaveMode: 
CREATE_SCHEMA_WHEN_NOT_EXIST, DataSaveMode: APPEND_DATA using Catalog: MySQL
   2024-05-06 18:48:28,434 INFO  
org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table 
test.awakening_earth_web.testCreate with action CREATE TABLE `testCreate` (
        `id` null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名称', 
        `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
        `type` null NOT NULL COMMENT '数据源类型', 
        `configuration` null NOT NULL COMMENT '数据源连接配置', 
        `description` null NULL COMMENT '数据源描述', 
        `icon` null NULL COMMENT '图标', 
        PRIMARY KEY (`id`)
   ) COMMENT = '';
   2024-05-06 18:48:29,499 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - 
Execute sql : CREATE TABLE `testCreate` (
        `id` null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名称', 
        `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', 
        `type` null NOT NULL COMMENT '数据源类型', 
        `configuration` null NOT NULL COMMENT '数据源连接配置', 
        `description` null NULL COMMENT '数据源描述', 
        `icon` null NULL COMMENT '图标', 
        PRIMARY KEY (`id`)
   ) COMMENT = '';
   2024-05-06 18:48:29,516 INFO  
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - 
Catalog MySQL closing
   2024-05-06 18:48:29,516 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTTING_DOWN
   2024-05-06 18:48:29,519 INFO  
com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 
[seatunnel] [5.1] Removed connection to endpoint: 
[localhost]:5801:7c595412-458a-4afd-9528-f6508f4e1ad6, connection: 
ClientConnection{alive=false, connectionId=1, 
channel=NioChannel{/127.0.0.1:52796->localhost/127.0.0.1:5801}, 
remoteAddress=[localhost]:5801, lastReadTime=2024-05-06 18:48:29.504, 
lastWriteTime=2024-05-06 18:48:29.503, closedTime=2024-05-06 18:48:29.517, 
connected server version=5.1}
   2024-05-06 18:48:29,519 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
CLIENT_DISCONNECTED
   2024-05-06 18:48:29,556 INFO  com.hazelcast.core.LifecycleService - 
hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is 
SHUTDOWN
   2024-05-06 18:48:29,557 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
Closed SeaTunnel client......
   2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
   
   
===============================================================================
   
   
   2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
Fatal Error, 
   
   2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
Please submit bug report in https://github.com/apache/seatunnel/issues
   
   2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
Reason:SeaTunnel job executed failed 
   
   2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
SeaTunnel job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45)
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216)
        at 
org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105)
        at 
org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153)
        ... 2 more
   Caused by: 
org.apache.seatunnel.api.table.catalog.exception.CatalogException: 
ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed 
creating table awakening_earth_web.awakening_earth_web.testCreate
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
        at 
org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
        at 
org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669)
        ... 8 more
   Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL 
syntax; check the manual that corresponds to your MySQL server version for the 
right syntax to use near 'null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名�' at line 2
        at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
        at 
com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362)
        ... 15 more
    
   2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
   
===============================================================================
   
   
   
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45)
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216)
        at 
org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105)
        at 
org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153)
        ... 2 more
   Caused by: 
org.apache.seatunnel.api.table.catalog.exception.CatalogException: 
ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed 
creating table awakening_earth_web.awakening_earth_web.testCreate
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
        at 
org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
        at 
org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
        at 
org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669)
        ... 8 more
   Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL 
syntax; check the manual that corresponds to your MySQL server version for the 
right syntax to use near 'null NOT NULL COMMENT '主键id', 
        `name` null NOT NULL COMMENT '数据源名�' at line 2
        at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
        at 
com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531)
        at 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362)
        ... 15 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   `jdk1.8`
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to