This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66b0f1e1d2 [Feature][Connector] add starrocks save_mode (#6029)
66b0f1e1d2 is described below

commit 66b0f1e1d294e60a04e68e66064b6fc945678d9f
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 12 14:37:52 2024 +0800

    [Feature][Connector] add starrocks save_mode (#6029)
---
 docs/en/connector-v2/sink/StarRocks.md             |  95 ++++++++++++---
 .../starrocks/catalog/StarRocksCatalog.java        |  65 +++++++++-
 .../starrocks/catalog/StarRocksCatalogFactory.java |   4 +-
 .../seatunnel/starrocks/config/SinkConfig.java     |   7 +-
 .../starrocks/config/StarRocksSinkOptions.java     |  24 ++--
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  67 +++++------
 .../starrocks/sink/StarRocksSinkFactory.java       |  57 ++++++++-
 .../starrocks/catalog/StarRocksCatalogTest.java    |   2 +-
 .../connector-starrocks-e2e/pom.xml                |   9 ++
 .../e2e/connector/starrocks/StarRocksIT.java       | 131 ++++++++++++---------
 .../starrocks-thrift-to-starrocks-streamload.conf  |   6 +-
 11 files changed, 341 insertions(+), 126 deletions(-)

diff --git a/docs/en/connector-v2/sink/StarRocks.md 
b/docs/en/connector-v2/sink/StarRocks.md
index 940d806ef3..03afca211b 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -20,24 +20,27 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 
 ## Sink Options
 
-|            Name             |  Type   | Required |     Default     |         
                                                                                
           Description                                                          
                                          |
-|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| nodeUrls                    | list    | yes      | -               | 
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`        
                                                                                
                                                  |
-| base-url                    | string  | yes      | -               | The 
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` 
or `jdbc:mysql://localhost:9030/db`                                             
                                                |
-| username                    | string  | yes      | -               | 
`StarRocks` user username                                                       
                                                                                
                                                  |
-| password                    | string  | yes      | -               | 
`StarRocks` user password                                                       
                                                                                
                                                  |
-| database                    | string  | yes      | -               | The 
name of StarRocks database                                                      
                                                                                
                                              |
-| table                       | string  | no       | -               | The 
name of StarRocks table, If not set, the table name will be the name of the 
upstream table                                                                  
                                                  |
-| labelPrefix                 | string  | no       | -               | The 
prefix of StarRocks stream load label                                           
                                                                                
                                              |
-| batch_max_rows              | long    | no       | 1024            | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
-| batch_max_bytes             | int     | no       | 5 * 1024 * 1024 | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
-| max_retries                 | int     | no       | -               | The 
number of retries to flush failed                                               
                                                                                
                                              |
-| retry_backoff_multiplier_ms | int     | no       | -               | Using 
as a multiplier for generating the next delay for backoff                       
                                                                                
                                            |
-| max_retry_backoff_ms        | int     | no       | -               | The 
amount of time to wait before attempting to retry a request to `StarRocks`      
                                                                                
                                              |
-| enable_upsert_delete        | boolean | no       | false           | Whether 
to enable upsert/delete, only supports PrimaryKey model.                        
                                                                                
                                          |
-| save_mode_create_template   | string  | no       | see below       | see 
below                                                                           
                                                                                
                                              |
-| starrocks.config            | map     | no       | -               | The 
parameter of the stream load `data_desc`                                        
                                                                                
                                              |
-| http_socket_timeout_ms      | int     | no       | 180000          | Set 
http socket timeout, default is 3 minutes.                                      
                                                                                
                                              |
+|            Name             |  Type   | Required |           Default         
   |                                                                            
                        Description                                             
                                                       |
+|-----------------------------|---------|----------|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| nodeUrls                    | list    | yes      | -                         
   | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`   
                                                                                
                                                       |
+| base-url                    | string  | yes      | -                         
   | The JDBC URL like `jdbc:mysql://localhost:9030/` or 
`jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db`               
                                                                              |
+| username                    | string  | yes      | -                         
   | `StarRocks` user username                                                  
                                                                                
                                                       |
+| password                    | string  | yes      | -                         
   | `StarRocks` user password                                                  
                                                                                
                                                       |
+| database                    | string  | yes      | -                         
   | The name of StarRocks database                                             
                                                                                
                                                       |
+| table                       | string  | no       | -                         
   | The name of StarRocks table, If not set, the table name will be the name 
of the upstream table                                                           
                                                         |
+| labelPrefix                 | string  | no       | -                         
   | The prefix of StarRocks stream load label                                  
                                                                                
                                                       |
+| batch_max_rows              | long    | no       | 1024                      
   | For batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| batch_max_bytes             | int     | no       | 5 * 1024 * 1024           
   | For batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| max_retries                 | int     | no       | -                         
   | The number of retries to flush failed                                      
                                                                                
                                                       |
+| retry_backoff_multiplier_ms | int     | no       | -                         
   | Using as a multiplier for generating the next delay for backoff            
                                                                                
                                                       |
+| max_retry_backoff_ms        | int     | no       | -                         
   | The amount of time to wait before attempting to retry a request to 
`StarRocks`                                                                     
                                                               |
+| enable_upsert_delete        | boolean | no       | false                     
   | Whether to enable upsert/delete, only supports PrimaryKey model.           
                                                                                
                                                       |
+| save_mode_create_template   | string  | no       | see below                 
   | see below                                                                  
                                                                                
                                                       |
+| starrocks.config            | map     | no       | -                         
   | The parameter of the stream load `data_desc`                               
                                                                                
                                                       |
+| http_socket_timeout_ms      | int     | no       | 180000                    
   | Set http socket timeout, default is 3 minutes.                             
                                                                                
                                                       |
+| schema_save_mode            | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, 
different treatment schemes are selected for the existing surface structure of 
the target side.                                                                
         |
+| data_save_mode              | Enum    | no       | APPEND_DATA               
   | Before the synchronous task is turned on, different processing schemes are 
selected for data existing data on the target side.                             
                                                       |
+| custom_sql                  | String  | no       | -                         
   | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the 
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be 
executed. SQL will be executed before synchronization tasks.        |
 
 ### save_mode_create_template
 
@@ -45,6 +48,40 @@ We use templates to automatically create starrocks tables,
 which will create corresponding table creation statements based on the type of 
upstream data and schema type,
 and the default template can be modified according to the situation. Only work 
on multi-table mode at now.
 
+### table [string]
+
+Use `database` and this `table-name` auto-generate sql and receive upstream 
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
+The table parameter can fill in the name of an unwilling table, which will 
eventually be used as the table name of the creation table, and supports 
variables (`${table_name}`, `${schema_name}`). Replacement rules: 
`${schema_name}` will replace the SCHEMA name passed to the target side, and 
`${table_name}` will replace the name of the table passed to the table at the 
target side.
+
+for example:
+1. test_${schema_name}_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are 
selected for the existing surface structure of the target side.  
+Option introduction:  
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and 
rebuild when the table is saved        
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, 
skipped when the table is saved        
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not 
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are 
selected for data existing data on the target side.  
+Option introduction:  
+`DROP_DATA`: Preserve database structure and delete data  
+`APPEND_DATA`:Preserve database structure, preserve data  
+`CUSTOM_PROCESSING`:User defined processing  
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### custom_sql[String]
+
+When data_save_mode selects CUSTOM_PROCESSING, you should fill in the 
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be 
executed. SQL will be executed before synchronization tasks.
+
 ```sql
 CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
 (
@@ -222,6 +259,28 @@ sink {
 }
 ```
 
+### Use save_mode function
+
+```
+sink {
+  StarRocks {
+    nodeUrls = ["e2e_starRocksdb:8030"]
+    username = root
+    password = ""
+    database = "test"
+    table = "test_${schema_name}_${table_name}"
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode="APPEND_DATA"
+    batch_max_rows = 10
+    starrocks.config = {
+      format = "CSV"
+      column_separator = "\\x01"
+      row_delimiter = "\\x02"
+    }
+  }
+}
+```
+
 ## Changelog
 
 ### next version
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 0c37322199..3dc7eebfa6 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.mysql.cj.MysqlType;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -63,6 +65,7 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 
+@Slf4j
 public class StarRocksCatalog implements Catalog {
 
     protected final String catalogName;
@@ -72,6 +75,7 @@ public class StarRocksCatalog implements Catalog {
     protected final String baseUrl;
     protected String defaultUrl;
     private final JdbcUrlUtil.UrlInfo urlInfo;
+    private final String template;
 
     private static final Set<String> SYS_DATABASES = new HashSet<>();
     private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksCatalog.class);
@@ -81,10 +85,10 @@ public class StarRocksCatalog implements Catalog {
         SYS_DATABASES.add("_statistics_");
     }
 
-    public StarRocksCatalog(String catalogName, String username, String pwd, 
String defaultUrl) {
+    public StarRocksCatalog(
+            String catalogName, String username, String pwd, String 
defaultUrl, String template) {
 
         checkArgument(StringUtils.isNotBlank(username));
-        checkArgument(StringUtils.isNotBlank(pwd));
         checkArgument(StringUtils.isNotBlank(defaultUrl));
         urlInfo = JdbcUrlUtil.getUrlInfo(defaultUrl);
         this.baseUrl = urlInfo.getUrlWithoutDatabase();
@@ -95,6 +99,7 @@ public class StarRocksCatalog implements Catalog {
         this.catalogName = catalogName;
         this.username = username;
         this.pwd = pwd;
+        this.template = template;
     }
 
     @Override
@@ -208,13 +213,64 @@ public class StarRocksCatalog implements Catalog {
     @Override
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
-        throw new UnsupportedOperationException();
+        this.createTable(
+                StarRocksSaveModeUtil.fillingCreateSql(
+                        template,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName(),
+                        table.getTableSchema()));
     }
 
     @Override
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            if (ignoreIfNotExists) {
+                conn.createStatement().execute("DROP TABLE IF EXISTS " + 
tablePath.getFullName());
+            } else {
+                conn.createStatement()
+                        .execute(String.format("DROP TABLE %s", 
tablePath.getFullName()));
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            if (ignoreIfNotExists) {
+                conn.createStatement()
+                        .execute(String.format("TRUNCATE TABLE  %s", 
tablePath.getFullName()));
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed TRUNCATE TABLE in catalog %s", 
tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    public void executeSql(TablePath tablePath, String sql) {
+        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            connection.createStatement().execute(sql);
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed EXECUTE SQL in 
catalog %s", sql), e);
+        }
+    }
+
+    public boolean isExistsData(TablePath tablePath) {
+        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            String sql = String.format("select * from %s limit 1", 
tablePath.getFullName());
+            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+            if (resultSet == null) {
+                return false;
+            }
+            return resultSet.next();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                    String.format("Failed Connection JDBC error %s", 
tablePath.getTableName()), e);
+        }
     }
 
     @Override
@@ -336,6 +392,7 @@ public class StarRocksCatalog implements Catalog {
     public void createTable(String sql)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            log.info("create table sql is :{}", sql);
             conn.createStatement().execute(sql);
         } catch (Exception e) {
             throw new CatalogException(
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 8204e0b539..94a93b3f56 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -36,7 +37,8 @@ public class StarRocksCatalogFactory implements 
CatalogFactory {
                 catalogName,
                 options.get(StarRocksOptions.USERNAME),
                 options.get(StarRocksOptions.PASSWORD),
-                options.get(StarRocksOptions.BASE_URL));
+                options.get(StarRocksOptions.BASE_URL),
+                options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 6862a87bc5..88b608dab3 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -58,7 +59,9 @@ public class SinkConfig implements Serializable {
 
     private String saveModeCreateTemplate;
 
+    private SchemaSaveMode schemaSaveMode;
     private DataSaveMode dataSaveMode;
+    private String customSql;
 
     private int httpSocketTimeout;
 
@@ -91,7 +94,9 @@ public class SinkConfig implements Serializable {
         config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
                 .ifPresent(sinkConfig::setColumnSeparator);
         sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
-        sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
+        
sinkConfig.setSchemaSaveMode(config.get(StarRocksSinkOptions.SCHEMA_SAVE_MODE));
+        
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE));
+        sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL));
         
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
         return sinkConfig;
     }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 6500d1474d..c6cd605063 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -19,12 +19,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
 import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -131,17 +129,29 @@ public interface StarRocksSinkOptions {
                     .enumType(StreamLoadFormat.class)
                     .defaultValue(StreamLoadFormat.JSON)
                     .withDescription("");
+    Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription(
+                            "different treatment schemes are selected for the 
existing surface structure of the target side");
 
-    SingleChoiceOption<DataSaveMode> SAVE_MODE =
-            Options.key(SupportSaveMode.DATA_SAVE_MODE_KEY)
-                    .singleChoice(DataSaveMode.class, 
Arrays.asList(DataSaveMode.APPEND_DATA))
+    Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .enumType(DataSaveMode.class)
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription(
-                            "Table structure and data processing methods that 
already exist on the target end");
+                            "different processing schemes are selected for 
data existing data on the target side");
 
     Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
             Options.key("http_socket_timeout_ms")
                     .intType()
                     .defaultValue(3 * 60 * 1000)
                     .withDescription("Set http socket timeout, default is 3 
minutes.");
+
+    Option<String> CUSTOM_SQL =
+            Options.key("custom_sql")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("when schema_save_mode selects 
CUSTOM_PROCESSING custom SQL");
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ee613dd972..b9040f72d4 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,10 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -38,15 +42,17 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     private SeaTunnelRowType seaTunnelRowType;
     private final SinkConfig sinkConfig;
-    private final DataSaveMode dataSaveMode;
-
+    private DataSaveMode dataSaveMode;
+    private SchemaSaveMode schemaSaveMode;
     private final CatalogTable catalogTable;
 
-    public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
+    public StarRocksSink(
+            SinkConfig sinkConfig, CatalogTable catalogTable, ReadonlyConfig 
readonlyConfig) {
         this.sinkConfig = sinkConfig;
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
         this.catalogTable = catalogTable;
         this.dataSaveMode = sinkConfig.getDataSaveMode();
+        this.schemaSaveMode = sinkConfig.getSchemaSaveMode();
     }
 
     @Override
@@ -54,27 +60,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         return StarRocksCatalogFactory.IDENTIFIER;
     }
 
-    private void autoCreateTable(String template) {
-        StarRocksCatalog starRocksCatalog =
-                new StarRocksCatalog(
-                        "StarRocks",
-                        sinkConfig.getUsername(),
-                        sinkConfig.getPassword(),
-                        sinkConfig.getJdbcUrl());
-        if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
-            
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""), 
true);
-        }
-        if (!starRocksCatalog.tableExists(
-                TablePath.of(sinkConfig.getDatabase(), 
sinkConfig.getTable()))) {
-            starRocksCatalog.createTable(
-                    StarRocksSaveModeUtil.fillingCreateSql(
-                            template,
-                            sinkConfig.getDatabase(),
-                            sinkConfig.getTable(),
-                            catalogTable.getTableSchema()));
-        }
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
@@ -82,18 +67,26 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
-        return Optional.empty();
-    }
-
-    /*@Override
-    public DataSaveMode getUserConfigSaveMode() {
-        return dataSaveMode;
+        TablePath tablePath =
+                TablePath.of(
+                        catalogTable.getTableId().getDatabaseName(),
+                        catalogTable.getTableId().getSchemaName(),
+                        catalogTable.getTableId().getTableName());
+        Catalog catalog =
+                new StarRocksCatalog(
+                        "StarRocks",
+                        sinkConfig.getUsername(),
+                        sinkConfig.getPassword(),
+                        sinkConfig.getJdbcUrl(),
+                        sinkConfig.getSaveModeCreateTemplate());
+        catalog.open();
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode,
+                        dataSaveMode,
+                        catalog,
+                        tablePath,
+                        catalogTable,
+                        sinkConfig.getCustomSql()));
     }
-
-    @Override
-    public void handleSaveMode(DataSaveMode saveMode) {
-        if (catalogTable != null) {
-            autoCreateTable(sinkConfig.getSaveModeCreateTemplate());
-        }
-    }*/
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 9c0a8b42d1..081645270f 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -31,6 +33,11 @@ import org.apache.commons.lang3.StringUtils;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY;
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY;
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;
+
 @AutoService(Factory.class)
 public class StarRocksSinkFactory implements TableSinkFactory {
     @Override
@@ -54,9 +61,14 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
                         StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
                         StarRocksSinkOptions.STARROCKS_CONFIG,
                         StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
-                        StarRocksSinkOptions.SAVE_MODE,
+                        StarRocksSinkOptions.SCHEMA_SAVE_MODE,
+                        StarRocksSinkOptions.DATA_SAVE_MODE,
                         StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
                         StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
+                .conditional(
+                        DATA_SAVE_MODE,
+                        DataSaveMode.CUSTOM_PROCESSING,
+                        StarRocksSinkOptions.CUSTOM_SQL)
                 .build();
     }
 
@@ -67,6 +79,47 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
         if (StringUtils.isBlank(sinkConfig.getTable())) {
             sinkConfig.setTable(catalogTable.getTableId().getTableName());
         }
-        return () -> new StarRocksSink(sinkConfig, catalogTable);
+        // get source table relevant information
+        TableIdentifier tableId = catalogTable.getTableId();
+        String sourceDatabaseName = tableId.getDatabaseName();
+        String sourceSchemaName = tableId.getSchemaName();
+        String sourceTableName = tableId.getTableName();
+        // get sink table relevant information
+        String sinkDatabaseName = sinkConfig.getDatabase();
+        String sinkTableName = sinkConfig.getTable();
+        // to replace
+        String finalDatabaseName =
+                sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, 
sourceDatabaseName);
+        String finalTableName = this.replaceFullTableName(sinkTableName, 
tableId);
+        // rebuild TableIdentifier and catalogTable
+        TableIdentifier newTableId =
+                TableIdentifier.of(
+                        tableId.getCatalogName(), finalDatabaseName, null, 
finalTableName);
+        catalogTable =
+                CatalogTable.of(
+                        newTableId,
+                        catalogTable.getTableSchema(),
+                        catalogTable.getOptions(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getCatalogName());
+
+        CatalogTable finalCatalogTable = catalogTable;
+        // reset
+        sinkConfig.setTable(finalTableName);
+        sinkConfig.setDatabase(finalDatabaseName);
+        return () -> new StarRocksSink(sinkConfig, finalCatalogTable, 
context.getOptions());
+    }
+
+    private String replaceFullTableName(String original, TableIdentifier 
tableId) {
+        if (StringUtils.isNotBlank(tableId.getDatabaseName())) {
+            original = original.replace(REPLACE_DATABASE_NAME_KEY, 
tableId.getDatabaseName());
+        }
+        if (StringUtils.isNotBlank(tableId.getSchemaName())) {
+            original = original.replace(REPLACE_SCHEMA_NAME_KEY, 
tableId.getSchemaName());
+        }
+        if (StringUtils.isNotBlank(tableId.getTableName())) {
+            original = original.replace(REPLACE_TABLE_NAME_KEY, 
tableId.getTableName());
+        }
+        return original;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
index d692d85999..bdc7a15244 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
@@ -36,7 +36,7 @@ public class StarRocksCatalogTest {
     public void testCatalog() {
         StarRocksCatalog catalog =
                 new StarRocksCatalog(
-                        "starrocks", "root", "123456", 
"jdbc:mysql://47.108.65.163:9030/");
+                        "starrocks", "root", "123456", 
"jdbc:mysql://47.108.65.163:9030/", "");
         List<String> databases = catalog.listDatabases();
         LOGGER.info("find databases: " + databases);
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
index e76e5a7748..38e4f7eabe 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
@@ -25,6 +25,9 @@
     <artifactId>connector-starrocks-e2e</artifactId>
     <name>SeaTunnel : E2E : Connector V2 : StarRocks</name>
 
+    <properties>
+        <mysql.version>8.0.27</mysql.version>
+    </properties>
     <dependencies>
         <!-- SeaTunnel connectors -->
         <dependency>
@@ -39,5 +42,11 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index a0630d760a..1bd694b102 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.e2e.connector.starrocks;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -28,24 +31,20 @@ import 
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.math.BigDecimal;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.PreparedStatement;
@@ -64,18 +63,16 @@ import java.util.stream.Stream;
 import static org.awaitility.Awaitility.given;
 
 @Slf4j
-@Disabled("There are still errors unfixed @Hisoka-X")
 public class StarRocksIT extends TestSuiteBase implements TestResource {
     private static final String DOCKER_IMAGE = 
"d87904488/starrocks-starter:2.2.1";
     private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
     private static final String HOST = "starrocks_e2e";
     private static final int SR_DOCKER_PORT = 9030;
     private static final int SR_PORT = 9033;
-
-    private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
     private static final String USERNAME = "root";
     private static final String PASSWORD = "";
     private static final String DATABASE = "test";
+    private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
     private static final String SOURCE_TABLE = "e2e_table_source";
     private static final String SINK_TABLE = "e2e_table_sink";
     private static final String SR_DRIVER_JAR =
@@ -94,36 +91,7 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
                     + "  SMALLINT_COL   SMALLINT,\n"
                     + "  TINYINT_COL    TINYINT,\n"
                     + "  BOOLEAN_COL    BOOLEAN,\n"
-                    + "  DECIMAL_COL    DECIMAL,\n"
-                    + "  DOUBLE_COL     DOUBLE,\n"
-                    + "  FLOAT_COL      FLOAT,\n"
-                    + "  INT_COL        INT,\n"
-                    + "  CHAR_COL       CHAR,\n"
-                    + "  VARCHAR_11_COL VARCHAR(11),\n"
-                    + "  STRING_COL     STRING,\n"
-                    + "  DATETIME_COL   DATETIME,\n"
-                    + "  DATE_COL       DATE\n"
-                    + ")ENGINE=OLAP\n"
-                    + "DUPLICATE KEY(`BIGINT_COL`)\n"
-                    + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n"
-                    + "PROPERTIES (\n"
-                    + "\"replication_num\" = \"1\",\n"
-                    + "\"in_memory\" = \"false\","
-                    + "\"storage_format\" = \"DEFAULT\""
-                    + ")";
-
-    private static final String DDL_SINK =
-            "create table "
-                    + DATABASE
-                    + "."
-                    + SINK_TABLE
-                    + " (\n"
-                    + "  BIGINT_COL     BIGINT,\n"
-                    + "  LARGEINT_COL   LARGEINT,\n"
-                    + "  SMALLINT_COL   SMALLINT,\n"
-                    + "  TINYINT_COL    TINYINT,\n"
-                    + "  BOOLEAN_COL    BOOLEAN,\n"
-                    + "  DECIMAL_COL    DECIMAL,\n"
+                    + "  DECIMAL_COL    Decimal(12, 1),\n"
                     + "  DOUBLE_COL     DOUBLE,\n"
                     + "  FLOAT_COL      FLOAT,\n"
                     + "  INT_COL        INT,\n"
@@ -214,9 +182,9 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
                                 Short.parseShort("1"),
                                 Byte.parseByte("1"),
                                 Boolean.FALSE,
-                                BigDecimal.valueOf(2222243, 1),
+                                BigDecimal.valueOf(12345, 1),
                                 Double.parseDouble("2222243.2222243"),
-                                Float.parseFloat("222224"),
+                                Float.parseFloat("22.17"),
                                 Integer.parseInt("1"),
                                 "a",
                                 "VARCHAR_COL",
@@ -249,11 +217,14 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
         try {
             assertHasData(SINK_TABLE);
 
-            String sourceSql = String.format("select * from %s.%s", DATABASE, 
SOURCE_TABLE);
-            String sinkSql = String.format("select * from %s.%s", DATABASE, 
SINK_TABLE);
+            String sourceSql =
+                    String.format(
+                            "select * from %s.%s order by BIGINT_COL ", 
DATABASE, SOURCE_TABLE);
+            String sinkSql =
+                    String.format("select * from %s.%s order by BIGINT_COL ", 
DATABASE, SINK_TABLE);
             List<String> columnList =
                     Arrays.stream(COLUMN_STRING.split(","))
-                            .map(x -> x.trim())
+                            .map(String::trim)
                             .collect(Collectors.toList());
             Statement sourceStatement = jdbcConnection.createStatement();
             Statement sinkStatement = jdbcConnection.createStatement();
@@ -262,27 +233,19 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
             Assertions.assertEquals(
                     sourceResultSet.getMetaData().getColumnCount(),
                     sinkResultSet.getMetaData().getColumnCount());
+            log.info(container.getServerLogs());
             while (sourceResultSet.next()) {
                 if (sinkResultSet.next()) {
                     for (String column : columnList) {
                         Object source = sourceResultSet.getObject(column);
                         Object sink = sinkResultSet.getObject(column);
                         if (!Objects.deepEquals(source, sink)) {
-                            InputStream sourceAsciiStream = 
sourceResultSet.getBinaryStream(column);
-                            InputStream sinkAsciiStream = 
sinkResultSet.getBinaryStream(column);
-                            String sourceValue =
-                                    IOUtils.toString(sourceAsciiStream, 
StandardCharsets.UTF_8);
-                            String sinkValue =
-                                    IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
-                            Assertions.assertEquals(sourceValue, sinkValue);
+                            Assertions.assertEquals(String.valueOf(source), 
String.valueOf(sink));
                         }
                     }
                 }
             }
-            // Check the row numbers is equal
-            sourceResultSet.last();
-            sinkResultSet.last();
-            Assertions.assertEquals(sourceResultSet.getRow(), 
sinkResultSet.getRow());
+            Assertions.assertFalse(sinkResultSet.next());
             clearSinkTable();
         } catch (Exception e) {
             throw new RuntimeException("get starRocks connection error", e);
@@ -310,7 +273,7 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
             // create source table
             statement.execute(DDL_SOURCE);
             // create sink table
-            statement.execute(DDL_SINK);
+            // statement.execute(DDL_SINK);
         } catch (SQLException e) {
             throw new RuntimeException("Initializing table failed!", e);
         }
@@ -354,4 +317,64 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
             throw new RuntimeException("test starrocks server image error", e);
         }
     }
+
+    @TestTemplate
+    public void testCatalog(TestContainer container) throws IOException, 
InterruptedException {
+        TablePath tablePathStarRocksSource = TablePath.of("test", 
"e2e_table_source");
+        TablePath tablePathStarRocksSink = TablePath.of("test", 
"e2e_table_source_2");
+        StarRocksCatalog starRocksCatalog =
+                new StarRocksCatalog(
+                        "StarRocks",
+                        "root",
+                        PASSWORD,
+                        String.format(URL, starRocksServer.getHost()),
+                        "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n  
DUPLICATE KEY(`BIGINT_COL`) \n  DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n 
PROPERTIES (\n   \"replication_num\" = \"1\", \n  \"in_memory\" = \"false\" , 
\n  \"storage_format\" = \"DEFAULT\"  \n )");
+        starRocksCatalog.open();
+        CatalogTable catalogTable = 
starRocksCatalog.getTable(tablePathStarRocksSource);
+        // sink tableExists ?
+        starRocksCatalog.dropTable(tablePathStarRocksSink, true);
+        boolean tableExistsBefore = 
starRocksCatalog.tableExists(tablePathStarRocksSink);
+        Assertions.assertFalse(tableExistsBefore);
+        // create table
+        starRocksCatalog.createTable(tablePathStarRocksSink, catalogTable, 
true);
+        boolean tableExistsAfter = 
starRocksCatalog.tableExists(tablePathStarRocksSink);
+        Assertions.assertTrue(tableExistsAfter);
+        // isExistsData ?
+        boolean existsDataBefore = 
starRocksCatalog.isExistsData(tablePathStarRocksSink);
+        Assertions.assertFalse(existsDataBefore);
+        // insert one data
+        String customSql =
+                "insert into "
+                        + DATABASE
+                        + "."
+                        + "e2e_table_source_2"
+                        + " (\n"
+                        + "  BIGINT_COL,\n"
+                        + "  LARGEINT_COL,\n"
+                        + "  SMALLINT_COL,\n"
+                        + "  TINYINT_COL,\n"
+                        + "  BOOLEAN_COL,\n"
+                        + "  DECIMAL_COL,\n"
+                        + "  DOUBLE_COL,\n"
+                        + "  FLOAT_COL,\n"
+                        + "  INT_COL,\n"
+                        + "  CHAR_COL,\n"
+                        + "  VARCHAR_11_COL,\n"
+                        + "  STRING_COL,\n"
+                        + "  DATETIME_COL,\n"
+                        + "  DATE_COL\n"
+                        + ")values(\n"
+                        + "\t 
999,12345,1,1,false,1.1,9.9,2.5,3,'A','ADC','ASEDF','2022-08-13 
17:35:59','2022-08-13'\n"
+                        + ")";
+        starRocksCatalog.executeSql(tablePathStarRocksSink, customSql);
+        boolean existsDataAfter = 
starRocksCatalog.isExistsData(tablePathStarRocksSink);
+        Assertions.assertTrue(existsDataAfter);
+        // truncateTable
+        starRocksCatalog.truncateTable(tablePathStarRocksSink, true);
+        
Assertions.assertFalse(starRocksCatalog.isExistsData(tablePathStarRocksSink));
+        // drop table
+        starRocksCatalog.dropTable(tablePathStarRocksSink, true);
+        
Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink));
+        starRocksCatalog.close();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index 54f6de4b6c..7b4c25af73 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -61,10 +61,14 @@ sink {
     table = "e2e_table_sink"
     batch_max_rows = 100
     max_retries = 3
-
+    base-url="jdbc:mysql://starrocks_e2e:9030/test"
     starrocks.config = {
       format = "JSON"
       strip_outer_array = true
     }
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="APPEND_DATA"
+    save_mode_create_template = "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n  
DUPLICATE KEY(`BIGINT_COL`) \n  DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n 
PROPERTIES (\n   \"replication_num\" = \"1\", \n  \"in_memory\" = \"false\" , 
\n  \"storage_format\" = \"DEFAULT\"  \n )"
+
   }
 }
\ No newline at end of file

Reply via email to