yzeng1618 opened a new issue, #10092:
URL: https://github.com/apache/seatunnel/issues/10092

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When using a JDBC source with only `query` configured  and a Doris sink with 
the default `save_mode_create_template`, the job fails to auto-create the Doris 
table even though the upstream MySQL table has a primary key.
   
   **Context**
   
   - Source: MySQL via `JDBC` connector
   - Sink: Doris via `Doris` connector
   - Engine: Flink `1.20.1`, job launched as `SeaTunnelFlink` on YARN
   - SeaTunnel version: `2.3.12-SNAPSHOT` 
(seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar)
   - Source table DDL (MySQL):
   
     ```sql
     CREATE TABLE IF NOT EXISTS test_pq_db.test_db_10 (
       `id` bigint(20) AUTO_INCREMENT NOT NULL,
       `name` varchar(100) DEFAULT NULL,
       `age` int(10) DEFAULT NULL,
       `sex` boolean DEFAULT NULL,
       `address` varchar(100) DEFAULT NULL,
       `telephone` char(12) DEFAULT NULL,
       `height` float DEFAULT NULL,
       `weight` double DEFAULT NULL,
       `size` decimal(10,2) DEFAULT NULL,
       `ID_number` varchar(100) DEFAULT NULL,
       `date_time` date DEFAULT NULL,
       `ts` timestamp NULL,
       PRIMARY KEY (`id`)
     );
   
   Doris sink uses the default save_mode_create_template:
   
   CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
   ${rowtype_primary_key},
   ${rowtype_fields}
   ) ENGINE=OLAP
    UNIQUE KEY (${rowtype_primary_key})
   COMMENT '${comment}'
   DISTRIBUTED BY HASH (${rowtype_primary_key})
    PROPERTIES (
   "replication_allocation" = "tag.location.default: 1",
   "in_memory" = "false",
   "storage_format" = "V2",
   "disable_auto_compaction" = "false"
   )
   
   
   <img width="2414" height="1229" alt="Image" 
src="https://github.com/user-attachments/assets/3a73017d-d483-4401-a3f6-7ac03cd587ff";
 />
   
   ### SeaTunnel Version
   
   seatunnel 2.3.12
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
     job.name = "mysql_test"
   }
   
   source {
     Jdbc {
       url = "jdbc:mysql://xxxx:xxx/test_pq_db"
       driver = "com.mysql.jdbc.Driver"
       user = "root"
       password = "******"
       query = "select * from `test_pq_db`.`test_db_10` where 1 = 1"
       split.size = 5000
       fetch_size = 2000
     }
   }
   
   sink {
     Doris {
       fenodes = "xxxxx:8035"
       query-port = "9030"
       username = "******"
       password = "******"
       database = "test_pq_db"
       table = "test_db_10"
       doris.config {
         format = "json"
         read_json_by_line = "true"
       }
       # schema_save_mode = CREATE_SCHEMA_WHEN_NOT_EXIST (default)
       # data_save_mode = APPEND_DATA (default)
       # save_mode_create_template = <default template with 
${rowtype_primary_key}>
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   # Simplified example
   bin/seatunnel-flink.sh \
     --config mysql_to_doris.conf \
     --deploy-mode run-application \
     --target yarn-application
   ```
   
   ### Error Exception
   
   ```log
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at 
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.handleSaveMode(SinkExecuteProcessor.java:191)
 ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at 
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:139)
 ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at 
org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:104)
 ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at 
org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
 ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) 
~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at 
org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
 ~[seatunnel-flink-15-starter-2.3.12-SNAPSHOT.jar:2.3.12-SNAPSHOT]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_352]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_352]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_352]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_352]
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) 
~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
 ~[flink-dist-1.20.1.jar:1.20.1]
        ... 12 more
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-24], ErrorDescription:[The table of test_pq_db.test_db_10 has 
no primary keys, but the template 
    CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
   ${rowtype_primary_key},
   ${rowtype_fields}
   ) ENGINE=OLAP
    UNIQUE KEY (${rowtype_primary_key})
   COMMENT '${comment}'
   DISTRIBUTED BY HASH (${rowtype_primary_key})
    PROPERTIES (
   "replication_allocation" = "tag.location.default: 1",
   "in_memory" = "false",
   "storage_format" = "V2",
   "disable_auto_compaction" = "false"
   ) 
    which has the place holder named ${rowtype_primary_key}. Please use the 
option named save_mode_create_template to specify sql template]
   ```
   
   ### Zeta or Flink or Spark Version
   
   Flink: 1.20.1
   
   ### Java or Scala Version
   
   Java: 1.8.0_352
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to