aijing-sun opened a new issue, #5071:
URL: https://github.com/apache/seatunnel/issues/5071

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When I run test sync job "mysql-cdc -> starrocks", with the usage of 
starrocks savemode, the save_mode_create_template parser went wrong. 
   
   After debug, I find that the method 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil#mergeColumnInTemplate
 mess up the sequence of startIndex in columnInTemplate since it's a hashmap. 
As a consequence, the parsed create sql went wrong.
   ```java
   private static String mergeColumnInTemplate(
               Map<String, CreateTableParser.ColumnInfo> columnInTemplate,
               TableSchema tableSchema,
               String template) {
           int offset = 0;
           Map<String, Column> columnMap =
                   tableSchema.getColumns().stream()
                           .collect(Collectors.toMap(Column::getName, 
Function.identity()));
           for (String col : columnInTemplate.keySet()) {
               CreateTableParser.ColumnInfo columnInfo = 
columnInTemplate.get(col);
               if (StringUtils.isEmpty(columnInfo.getInfo())) {
                   if (columnMap.containsKey(col)) {
                       Column column = columnMap.get(col);
                       String newCol = columnToStarrocksType(column);
                       String prefix = template.substring(0, 
columnInfo.getStartIndex() + offset);
                       String suffix = template.substring(offset + 
columnInfo.getEndIndex());
                       if (prefix.endsWith("`")) {
                           prefix = prefix.substring(0, prefix.length() - 1);
                           offset--;
                       }
                       if (suffix.startsWith("`")) {
                           suffix = suffix.substring(1);
                           offset--;
                       }
                       template = prefix + newCol + suffix;
                       offset += newCol.length() - 
columnInfo.getName().length();
                   } else {
                       throw new IllegalArgumentException("Can't find column " 
+ col + " in table.");
                   }
               }
           }
           return template;
       }
   ```
   
   One solution could be sorting columnInTemplate values by startIndex before 
the for-loop.
   
   ### SeaTunnel Version
   
   2.3.2
   
   ### SeaTunnel Config
   
   ```conf
   {
       "env" : {
           "job.mode" : "STREAMING",
           "dag-parsing.mode" : "MULTIPLEX"
       },
       "source" : [
           {
               "base-url" : "xxxx",
               "password" : "xxxx",
               "table-pattern" : ".*",
               "catalog" : {
                   "factory" : "MySQL",
                   "name" : "mysql-source",
                   "table-pattern" : ".*"
               },
               "parallelism" : 1,
               "table-names" : [
                   "tpch.customer",
                   "tpch.lineitem",
                   "tpch.metrics"
               ],
               "database-pattern" : "tpch",
               "plugin_name" : "MySQL-CDC",
               "server-id" : "6000",
               "username" : "root"
           }
       ],
       "sink" : [
           {
               "base-url" : "xxx",
               "password" : "xxx",
               "database" : "test_aijing_11",
               "nodeUrls" : [
                   "xxxx"
               ],
               "plugin_name" : "StarRocks",
               "username" : "root"
           }
       ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh -c data/mysql_to_sr_db_sync.conf -e cluster
   ```
   
   
   ### Error Exception
   
   ```log
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.api.table.catalog.exception.CatalogException: 
ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed 
create table in catalog StarRocks, sql :[CREATE TABLE IF NOT EXISTS 
`test_aijing_11`.`lineitem` (
   `L_ORDERKEY`,`L`L_ORDERKEY` INT NOT NULL R` INT NOT NULL ,
   `L_PARTKEY` INT NOT NULL ,
   `L_SUPPKEY` INT NOT NULL ,
   `L_QUANTITY` Decimal(15, 2) NOT NULL ,
   `L_EXTENDEDPRICE` Decimal(15, 2) NOT NULL ,
   `L_DISCOUNT` Decimal(15, 2) NOT NULL ,
   `L_TAX` Decimal(15, 2) NOT NULL ,
   `L_RETURNFLAG` STRING NOT NULL ,
   `L_LINESTATUS` STRING NOT NULL ,
   `L_SHIPDATE` DATE NOT NULL ,
   `L_COMMITDATE` DATE NOT NULL ,
   `L_RECEIPTDATE` DATE NOT NULL ,
   `L_SHIPINSTRUCT` STRING NOT NULL ,
   `L_SHIPMODE` STRING NOT NULL ,
   `L_COMMENT` STRING NOT NULL
   ) ENGINE=OLAP
    PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`)
   DISTRIBUTED BY HASH (`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (
       "replication_num" = "1"
   )]
        at 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.createTable(StarRocksCatalog.java:334)
        at 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSink.autoCreateTable(StarRocksSink.java:95)
        at 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSink.handleSaveMode(StarRocksSink.java:132)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:620)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:611)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:557)
        at 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:170)
        at 
org.apache.seatunnel.engine.client.job.JobExecutionEnvironment.getLogicalDag(JobExecutionEnvironment.java:155)
        at 
org.apache.seatunnel.engine.client.job.JobExecutionEnvironment.execute(JobExecutionEnvironment.java:147)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:140)
        ... 2 more
   Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL 
syntax; check the manual that corresponds to your MySQL server version for the 
right syntax to use near 'L_ORDERKEY' at line 2
        at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at 
com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:768)
        at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:653)
        at 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog.createTable(StarRocksCatalog.java:331)
        ... 11 more
   ```
   
   
   ### Flink or Spark Version
   
   None
   
   ### Java or Scala Version
   
   jdk 1.8
   
   ### Screenshots
   
   
![image](https://github.com/apache/seatunnel/assets/7359662/7ee4bd89-7d10-4c90-9069-73c50359ade8)
   
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to