Ivan-gfan opened a new issue, #6799: URL: https://github.com/apache/seatunnel/issues/6799
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When I synchronized from MySQL to MySQL, fieldMapper of transform was added, and when `schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"`was configured at the sink, an error occurred in the automatic table creation function, and all field types were null. The following DDL statements are viewed in the log ```text 2024-05-06 18:48:28,434 INFO org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table awakening_earth_web.awakening_earth_web.testCreate with action CREATE TABLE `testCreate` ( `id` null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名称', `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', `type` null NOT NULL COMMENT '数据源类型', `configuration` null NOT NULL COMMENT '数据源连接配置', `description` null NULL COMMENT '数据源描述', `icon` null NULL COMMENT '图标', PRIMARY KEY (`id`) ) COMMENT = ''; 2024-05-06 18:48:29,499 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Execute sql : CREATE TABLE `testCreate` ( `id` null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名称', `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', `type` null NOT NULL COMMENT '数据源类型', `configuration` null NOT NULL COMMENT '数据源连接配置', `description` null NULL COMMENT '数据源描述', `icon` null NULL COMMENT '图标', PRIMARY KEY (`id`) ) COMMENT = ''; ``` I traced the code and found that there was a judgment in the `MysqlCreateTableSqlBuilder` class to determine if the previous catalogname was the current mysql, and if so, get the column.getSourceType, but the sourceType attribute is null, which causes this error ```java private String buildColumnIdentifySql(Column column, String catalogName) { final List<String> columnSqls = new ArrayList<>(); columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); boolean isSupportDef = true; if ((SqlType.TIME.equals(column.getDataType().getSqlType()) || SqlType.TIMESTAMP.equals(column.getDataType().getSqlType())) && column.getScale() != null) { BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column); columnSqls.add(typeDefine.getColumnType()); } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) { columnSqls.add(column.getSourceType()); } else { BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column); columnSqls.add(typeDefine.getColumnType()); } // nullable if (column.isNullable()) { columnSqls.add("NULL"); } else { columnSqls.add("NOT NULL"); } if (column.getComment() != null) { columnSqls.add("COMMENT '" + column.getComment() + "'"); } return String.join(" ", columnSqls); } ``` I tracked the code further and found that when sink was built(Class `MultipleTableJobConfigParser#parseSink`), there was a `tableWithActionMap` parameter, which stored the field structure of the source side and the field structure of transform, and the sourceType of the field structure of transform was null ```java ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig); String factoryId = getFactoryId(readonlyConfig); List<String> inputIds = getInputIds(readonlyConfig); List<List<Tuple2<CatalogTable, Action>>> inputVertices = inputIds.stream() .map(tableWithActionMap::get) .filter(Objects::nonNull) .collect(Collectors.toList()); if (inputVertices.isEmpty()) { // Tolerates incorrect configuration of simple graph inputVertices = Collections.singletonList(findLast(tableWithActionMap)); } else if (inputVertices.size() > 1) { for (List<Tuple2<CatalogTable, Action>> inputVertex : inputVertices) { if (inputVertex.size() > 1) { throw new JobDefineCheckException( "Sink don't support simultaneous writing of data from multi-table source and other sources."); } } } ``` It is not clear for the time being whether such a result should be obtained. Now I have only tracked it here, please help to see where the problem occurred From this point of view, this error should not occur only from mysql to mysql, but if the source side and the target side are of the same type, and there is a transform fieldmapper, the same error will occur ### SeaTunnel Version `2.3.4` ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "BATCH" } source{ Jdbc { url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "root" database = "test" table_list = [ { table_path = "test.source" } ] result_table_name = "test1" } } transform { FieldMapper { source_table_name = "test1" result_table_name = "test2" field_mapper = { id = id name = name plugin_name = plugin_name_test type = type configuration = configuration description = description icon = icon } } } sink { jdbc { url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "root" database = "test" table = "test.testCreate" generate_sink_sql = true schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" source_table_name = "test2" } } ``` ### Running Command ```shell The cluster mode is used to run the startup class under the `example-module` in the source code ``` ### Error Exception ```log 2024-05-06 18:48:27,337 INFO org.apache.seatunnel.api.sink.SaveModeExecuteWrapper - Executing save mode for table: awakening_earth_web.awakening_earth_web.testCreate, with SchemaSaveMode: CREATE_SCHEMA_WHEN_NOT_EXIST, DataSaveMode: APPEND_DATA using Catalog: MySQL 2024-05-06 18:48:28,434 INFO org.apache.seatunnel.api.sink.DefaultSaveModeHandler - Creating table test.awakening_earth_web.testCreate with action CREATE TABLE `testCreate` ( `id` null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名称', `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', `type` null NOT NULL COMMENT '数据源类型', `configuration` null NOT NULL COMMENT '数据源连接配置', `description` null NULL COMMENT '数据源描述', `icon` null NULL COMMENT '图标', PRIMARY KEY (`id`) ) COMMENT = ''; 2024-05-06 18:48:29,499 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Execute sql : CREATE TABLE `testCreate` ( `id` null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名称', `plugin_name_test` null NOT NULL COMMENT '数据源插件名称', `type` null NOT NULL COMMENT '数据源类型', `configuration` null NOT NULL COMMENT '数据源连接配置', `description` null NULL COMMENT '数据源描述', `icon` null NULL COMMENT '图标', PRIMARY KEY (`id`) ) COMMENT = ''; 2024-05-06 18:48:29,516 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Catalog MySQL closing 2024-05-06 18:48:29,516 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN 2024-05-06 18:48:29,519 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5801:7c595412-458a-4afd-9528-f6508f4e1ad6, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:52796->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-05-06 18:48:29.504, lastWriteTime=2024-05-06 18:48:29.503, closedTime=2024-05-06 18:48:29.517, connected server version=5.1} 2024-05-06 18:48:29,519 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2024-05-06 18:48:29,556 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2024-05-06 18:48:29,557 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client...... 2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - =============================================================================== 2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues 2024-05-06 18:48:29,557 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45) Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed] at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153) ... 2 more Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table awakening_earth_web.awakening_earth_web.testCreate at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69) at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38) at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669) ... 8 more Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名�' at line 2 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362) ... 15 more 2024-05-06 18:48:29,558 ERROR org.apache.seatunnel.core.starter.SeaTunnel - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:206) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.example.engine.SeaTunnelEngineExample.main(SeaTunnelEngineExample.java:45) Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed] at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:671) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:657) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:569) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:216) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:105) at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:173) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:153) ... 2 more Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed creating table awakening_earth_web.awakening_earth_web.testCreate at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:365) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:351) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108) at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69) at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38) at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:669) ... 8 more Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'null NOT NULL COMMENT '主键id', `name` null NOT NULL COMMENT '数据源名�' at line 2 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:371) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.executeInternal(AbstractJdbcCatalog.java:531) at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.createTableInternal(AbstractJdbcCatalog.java:362) ... 15 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version `jdk1.8` ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
