This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee0ead08 [INLONG-6747][Sort] StarRocks connector supports 
transferring all tables for all schemas in one database  (#6748)
5ee0ead08 is described below

commit 5ee0ead0825ce45663f71b8e03a81281be6c9511
Author: Liao Rui <[email protected]>
AuthorDate: Fri Dec 9 17:57:51 2022 +0800

    [INLONG-6747][Sort] StarRocks connector supports transferring all tables 
for all schemas in one database  (#6748)
    
    Co-authored-by: ryanrliao <[email protected]>
---
 .../src/main/assemblies/sort-connectors.xml        |   8 +
 .../{StarRocksConstant.java => Constant.java}      |  49 +-
 .../sort/protocol/constant/StarRocksConstant.java  |  18 -
 .../apache/inlong/sort/protocol/node/LoadNode.java |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   4 +-
 .../sort/protocol/node/load/StarRocksLoadNode.java |  11 +-
 .../base/sink/SchemaUpdateExceptionPolicy.java     |   1 +
 inlong-sort/sort-connectors/pom.xml                |   1 +
 inlong-sort/sort-connectors/starrocks/pom.xml      |  83 +++
 .../starrocks/manager/StarRocksSinkManager.java    | 596 +++++++++++++++++++++
 .../manager/StarRocksStreamLoadVisitor.java        | 322 +++++++++++
 .../table/sink/StarRocksDynamicSinkFunction.java   | 342 ++++++++++++
 .../table/sink/StarRocksDynamicTableSink.java      | 101 ++++
 .../sink/StarRocksDynamicTableSinkFactory.java     | 149 ++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 inlong-sort/sort-core/pom.xml                      |   6 +
 licenses/inlong-sort-connectors/LICENSE            |   9 +
 17 files changed, 1648 insertions(+), 72 deletions(-)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors.xml
index 83a4eda7a..0f7c2f01e 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors.xml
@@ -163,6 +163,14 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-connectors/starrocks/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-starrocks-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
         <!-- module's 3td-licenses, notices-->
         <fileSet>
             <directory>../licenses/inlong-sort-connectors</directory>
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
similarity index 53%
copy from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
index 8813a4e92..b425cfea6 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
@@ -18,54 +18,9 @@
 package org.apache.inlong.sort.protocol.constant;
 
 /**
- * StarRocks options constant
+ * common options constant
  */
-public class StarRocksConstant {
-
-    /**
-     * 'connector' = 'starrocks-inlong'
-     */
-    public static final String CONNECTOR = "connector";
-
-    /**
-     * Host of the stream load like: 
`jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...`.
-     */
-    public static final String JDBC_URL = "jdbc-url";
-
-    /**
-     * Host of the stream load like: 
`fe_ip1:http_port;fe_ip2:http_port;fe_ip3:http_port`.
-     */
-    public static final String LOAD_URL = "load-url";
-
-    /**
-     * StarRocks user name.
-     */
-    public static final String USERNAME = "username";
-
-    /**
-     * StarRocks user password.
-     */
-    public static final String PASSWORD = "password";
-
-    /**
-     * StarRocks stream load format, support json and csv.
-     */
-    public static final String FORMAT = "sink.properties.format";
-
-    /**
-     * StarRocks stream load strip outer array for json format.
-     */
-    public static final String STRIP_OUTER_ARRAY = 
"sink.properties.strip_outer_array";
-
-    /**
-     * Database name of the stream load.
-     */
-    public static final String DATABASE_NAME = "database-name";
-
-    /**
-     * Table name of the stream load.
-     */
-    public static final String TABLE_NAME = "table-name";
+public class Constant {
 
     /**
      * The multiple enable of sink
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
index 8813a4e92..cfc716dc3 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
@@ -67,22 +67,4 @@ public class StarRocksConstant {
      */
     public static final String TABLE_NAME = "table-name";
 
-    /**
-     * The multiple enable of sink
-     */
-    public static final String SINK_MULTIPLE_ENABLE = "sink.multiple.enable";
-
-    /**
-     * The multiple format of sink
-     */
-    public static final String SINK_MULTIPLE_FORMAT = "sink.multiple.format";
-
-    /**
-     * The multiple database-pattern of sink
-     */
-    public static final String SINK_MULTIPLE_DATABASE_PATTERN = 
"sink.multiple.database-pattern";
-    /**
-     * The multiple table-pattern of sink
-     */
-    public static final String SINK_MULTIPLE_TABLE_PATTERN = 
"sink.multiple.table-pattern";
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index ad3a19baa..72cc42931 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -42,6 +42,7 @@ import 
org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -69,7 +70,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
         @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = 
"greenplumLoad"),
         @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = 
"dlcIcebergLoad"),
-        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad")
+        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+        @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = 
"starRocksLoad")
 })
 @NoArgsConstructor
 @Data
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index dc1f6cf27..7f37075e0 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -47,6 +47,7 @@ import 
org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -88,7 +89,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
         @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = 
"greenplumLoad"),
         @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = 
"dlcIcebergLoad"),
-        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad")
+        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+        @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = 
"starRocksLoad"),
 })
 public interface Node {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
index 7b7e1455c..1c64c7d44 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.constant.Constant;
 import org.apache.inlong.sort.protocol.constant.StarRocksConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -145,13 +146,13 @@ public class StarRocksLoadNode extends LoadNode 
implements InlongMetric, Seriali
         options.put(StarRocksConstant.DATABASE_NAME, databaseName);
         options.put(StarRocksConstant.TABLE_NAME, tableName);
         if (sinkMultipleEnable != null && sinkMultipleEnable) {
-            options.put(StarRocksConstant.SINK_MULTIPLE_ENABLE, 
sinkMultipleEnable.toString());
-            options.put(StarRocksConstant.SINK_MULTIPLE_FORMAT,
+            options.put(Constant.SINK_MULTIPLE_ENABLE, 
sinkMultipleEnable.toString());
+            options.put(Constant.SINK_MULTIPLE_FORMAT,
                     Objects.requireNonNull(sinkMultipleFormat).identifier());
-            options.put(StarRocksConstant.SINK_MULTIPLE_DATABASE_PATTERN, 
databasePattern);
-            options.put(StarRocksConstant.SINK_MULTIPLE_TABLE_PATTERN, 
tablePattern);
+            options.put(Constant.SINK_MULTIPLE_DATABASE_PATTERN, 
databasePattern);
+            options.put(Constant.SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
         } else {
-            options.put(StarRocksConstant.SINK_MULTIPLE_ENABLE, "false");
+            options.put(Constant.SINK_MULTIPLE_ENABLE, "false");
         }
         options.put(StarRocksConstant.FORMAT, "json");
         options.put(StarRocksConstant.STRIP_OUTER_ARRAY, "true");
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
index 82295591d..7a48eef85 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
@@ -33,6 +33,7 @@ public enum SchemaUpdateExceptionPolicy {
     TRY_IT_BEST("Try it best to handle schema update, if can not handle it, 
just ignore it."),
     LOG_WITH_IGNORE("Ignore schema update and log it."),
     ALERT_WITH_IGNORE("Ignore schema update and alert it."),
+    STOP_PARTIAL("Only stop abnormal sink table, other tables writes 
normally."),
     THROW_WITH_STOP("Throw exception to stop flink job when meet schema 
update.");
 
     private String description;
diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index e40cb5a8a..801a401a9 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -54,6 +54,7 @@
         <module>tubemq</module>
         <module>filesystem</module>
         <module>doris</module>
+        <module>starrocks</module>
         <module>hudi</module>
     </modules>
 
diff --git a/inlong-sort/sort-connectors/starrocks/pom.xml 
b/inlong-sort/sort-connectors/starrocks/pom.xml
new file mode 100644
index 000000000..558a2054f
--- /dev/null
+++ b/inlong-sort/sort-connectors/starrocks/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>sort-connector-starrocks</artifactId>
+
+    <name>Apache InLong - Sort-connector-starrocks</name>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <starrocks-connector.version>1.2.3</starrocks-connector.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.starrocks</groupId>
+            <artifactId>flink-connector-starrocks</artifactId>
+            
<version>${starrocks-connector.version}_flink-${flink.minor.version}_${flink.scala.binary.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.starrocks.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
new file mode 100644
index 000000000..a6952eb6d
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.manager;
+
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
+import 
com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
+import com.starrocks.connector.flink.manager.StarRocksQueryVisitor;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import 
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StarRocks sink manager which caches and flushes data.
+ */
+public class StarRocksSinkManager implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksSinkManager.class);
+
+    private final StarRocksJdbcConnectionProvider jdbcConnProvider;
+    private final StarRocksQueryVisitor starrocksQueryVisitor;
+    private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+    private final StarRocksSinkOptions sinkOptions;
+    final LinkedBlockingDeque<StarRocksSinkBufferEntity> flushQueue = new 
LinkedBlockingDeque<>(1);
+
+    private transient Counter totalFlushBytes;
+    private transient Counter totalFlushRows;
+    private transient Counter totalFlushTime;
+    private transient Counter totalFlushTimeWithoutRetries;
+    private transient Counter totalFlushSucceededTimes;
+    private transient Counter totalFlushFailedTimes;
+    private transient Histogram flushTimeNs;
+    private transient Histogram offerTimeNs;
+
+    private transient Counter totalFilteredRows;
+    private transient Histogram commitAndPublishTimeMs;
+    private transient Histogram streamLoadPutTimeMs;
+    private transient Histogram readDataTimeMs;
+    private transient Histogram writeDataTimeMs;
+    private transient Histogram loadTimeMs;
+
+    private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
+    private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
+    private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES 
= "totalFlushTimeNsWithoutRetries";
+    private static final String COUNTER_TOTAL_FLUSH_COST_TIME = 
"totalFlushTimeNs";
+    private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES = 
"totalFlushSucceededTimes";
+    private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES = 
"totalFlushFailedTimes";
+    private static final String HISTOGRAM_FLUSH_TIME = "flushTimeNs";
+    private static final String HISTOGRAM_OFFER_TIME_NS = "offerTimeNs";
+
+    // from stream load result
+    private static final String COUNTER_NUMBER_FILTERED_ROWS = 
"totalFilteredRows";
+    private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = 
"commitAndPublishTimeMs";
+    private static final String HISTOGRAM_STREAM_LOAD_PUT_TIME_MS = 
"streamLoadPutTimeMs";
+    private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
+    private static final String HISTOGRAM_WRITE_DATA_TIME_MS = 
"writeDataTimeMs";
+    private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
+
+    private final Map<String, StarRocksSinkBufferEntity> bufferMap = new 
ConcurrentHashMap<>();
+    private static final long FLUSH_QUEUE_POLL_TIMEOUT = 3000;
+    private volatile boolean closed = false;
+    private volatile boolean flushThreadAlive = false;
+    private volatile Throwable flushException;
+
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> scheduledFuture;
+
+    private final boolean multipleSink;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private transient SinkMetricData metricData;
+
+    /**
+     * If a table writing throws exception, ignore it when receiving data 
later again
+     */
+    private Set<String> ignoreWriteTables = new HashSet<>();
+
+    public void setSinkMetricData(SinkMetricData metricData) {
+        this.metricData = metricData;
+    }
+
+    public StarRocksSinkManager(StarRocksSinkOptions sinkOptions,
+            TableSchema flinkSchema,
+            boolean multipleSink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+        this.sinkOptions = sinkOptions;
+        StarRocksJdbcConnectionOptions jdbcOptions = new 
StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(),
+                sinkOptions.getUsername(), sinkOptions.getPassword());
+        this.jdbcConnProvider = new 
StarRocksJdbcConnectionProvider(jdbcOptions);
+        this.starrocksQueryVisitor = new 
StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(),
+                sinkOptions.getTableName());
+
+        this.multipleSink = multipleSink;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+
+        init(flinkSchema);
+    }
+
+    public StarRocksSinkManager(StarRocksSinkOptions sinkOptions,
+            TableSchema flinkSchema,
+            StarRocksJdbcConnectionProvider jdbcConnProvider,
+            StarRocksQueryVisitor starrocksQueryVisitor,
+            boolean multipleSink,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+        this.sinkOptions = sinkOptions;
+        this.jdbcConnProvider = jdbcConnProvider;
+        this.starrocksQueryVisitor = starrocksQueryVisitor;
+
+        this.multipleSink = multipleSink;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+
+        init(flinkSchema);
+    }
+
+    protected void init(TableSchema schema) {
+        if (!multipleSink) {
+            validateTableStructure(schema);
+        }
+        String version = starrocksQueryVisitor.getStarRocksVersion();
+        this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(
+                sinkOptions,
+                null == schema ? new String[]{} : schema.getFieldNames(),
+                version.length() > 0 && !version.trim().startsWith("1."));
+    }
+
+    public void setRuntimeContext(RuntimeContext runtimeCtx) {
+        totalFlushBytes = 
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
+        totalFlushRows = 
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
+        totalFlushTime = 
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
+        totalFlushTimeWithoutRetries = runtimeCtx.getMetricGroup()
+                .counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
+        totalFlushSucceededTimes = 
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
+        totalFlushFailedTimes = 
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
+        flushTimeNs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+        offerTimeNs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+
+        totalFilteredRows = 
runtimeCtx.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
+        commitAndPublishTimeMs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+        streamLoadPutTimeMs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PUT_TIME_MS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+        readDataTimeMs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+        writeDataTimeMs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+        loadTimeMs = 
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS,
+                new 
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+    }
+
+    public void startAsyncFlushing() {
+        // start flush thread
+        Thread flushThread = new Thread(() -> {
+            while (true) {
+                try {
+                    if (!asyncFlush()) {
+                        LOGGER.info("StarRocks flush thread is about to 
exit.");
+                        flushThreadAlive = false;
+                        break;
+                    }
+                } catch (Exception e) {
+                    flushException = e;
+                }
+            }
+        });
+
+        flushThread.setUncaughtExceptionHandler((t, e) -> {
+            LOGGER.error("StarRocks flush thread uncaught exception occurred: 
" + e.getMessage(), e);
+            flushException = e;
+            flushThreadAlive = false;
+        });
+        flushThread.setName("starrocks-flush");
+        flushThread.setDaemon(true);
+        flushThread.start();
+        flushThreadAlive = true;
+    }
+
+    public void startScheduler() throws IOException {
+        if 
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            return;
+        }
+        stopScheduler();
+        this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("starrocks-interval-sink"));
+        this.scheduledFuture = this.scheduler.schedule(() -> {
+            synchronized (StarRocksSinkManager.this) {
+                if (!closed) {
+                    try {
+                        LOGGER.info("StarRocks interval Sinking triggered.");
+                        if (bufferMap.isEmpty()) {
+                            startScheduler();
+                        }
+                        flush(null, false);
+                    } catch (Exception e) {
+                        flushException = e;
+                    }
+                }
+            }
+        }, sinkOptions.getSinkMaxFlushInterval(), TimeUnit.MILLISECONDS);
+    }
+
+    public void stopScheduler() {
+        if (this.scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            this.scheduler.shutdown();
+        }
+    }
+
+    public final synchronized void writeRecords(String database, String table, 
String... records) throws IOException {
+        checkFlushException();
+        try {
+            if (0 == records.length) {
+                return;
+            }
+            String bufferKey = String.format("%s,%s", database, table);
+            StarRocksSinkBufferEntity bufferEntity = 
bufferMap.computeIfAbsent(bufferKey,
+                    k -> new StarRocksSinkBufferEntity(database, table, 
sinkOptions.getLabelPrefix()));
+            for (String record : records) {
+                byte[] bts = record.getBytes(StandardCharsets.UTF_8);
+                bufferEntity.addToBuffer(bts);
+            }
+            if 
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+                return;
+            }
+            if (bufferEntity.getBatchCount() >= sinkOptions.getSinkMaxRows()
+                    || bufferEntity.getBatchSize() >= 
sinkOptions.getSinkMaxBytes()) {
+                LOGGER.info(
+                        String.format("StarRocks buffer Sinking triggered: db: 
[%s] table: [%s] rows[%d] label[%s].",
+                                database, table, bufferEntity.getBatchCount(), 
bufferEntity.getLabel()));
+                flush(bufferKey, false);
+            }
+        } catch (Exception e) {
+            throw new IOException("Writing records to StarRocks failed.", e);
+        }
+    }
+
+    public synchronized void flush(String bufferKey, boolean waitUtilDone) 
throws Exception {
+        if (bufferMap.isEmpty()) {
+            flushInternal(null, waitUtilDone);
+            return;
+        }
+        if (null == bufferKey) {
+            for (String key : bufferMap.keySet()) {
+                flushInternal(key, waitUtilDone);
+            }
+            return;
+        }
+        flushInternal(bufferKey, waitUtilDone);
+    }
+
+    private synchronized void flushInternal(String bufferKey, boolean 
waitUtilDone) throws Exception {
+        checkFlushException();
+        if (null == bufferKey || bufferMap.isEmpty() || 
!bufferMap.containsKey(bufferKey)) {
+            if (waitUtilDone) {
+                waitAsyncFlushingDone();
+            }
+            return;
+        }
+        offer(bufferMap.get(bufferKey));
+        bufferMap.remove(bufferKey);
+        if (waitUtilDone) {
+            // wait the last flush
+            waitAsyncFlushingDone();
+        }
+    }
+
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+
+            LOGGER.info("StarRocks Sink is about to close.");
+            this.bufferMap.clear();
+
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                scheduler.shutdown();
+            }
+            if (jdbcConnProvider != null) {
+                jdbcConnProvider.close();
+            }
+
+            offerEOF();
+        }
+        checkFlushException();
+    }
+
+    public Map<String, StarRocksSinkBufferEntity> getBufferedBatchMap() {
+        Map<String, StarRocksSinkBufferEntity> clone = new HashMap<>();
+        clone.putAll(bufferMap);
+        return clone;
+    }
+
+    public void setBufferedBatchMap(Map<String, StarRocksSinkBufferEntity> 
bufferMap) throws IOException {
+        if 
(!StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            return;
+        }
+        this.bufferMap.clear();
+        this.bufferMap.putAll(bufferMap);
+    }
+
+    /**
+     * async flush
+     *
+     * @return false if met eof and flush thread will exit.
+     */
+    private boolean asyncFlush() throws Exception {
+        StarRocksSinkBufferEntity flushData = 
flushQueue.poll(FLUSH_QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+        if (flushData == null || (0 == flushData.getBatchCount() && 
!flushData.EOF())) {
+            return true;
+        }
+        if (flushData.EOF()) {
+            return false;
+        }
+        stopScheduler();
+
+        String tableIdentifier = flushData.getDatabase() + "." + 
flushData.getTable();
+
+        if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == schemaUpdatePolicy && 
ignoreWriteTables.contains(
+                tableIdentifier)) {
+            LOGGER.warn(
+                    String.format("Stop writing to db[%s] table[%s] because of 
former errors and stop_partial policy",
+                            flushData.getDatabase(), flushData.getTable()));
+            return true;
+        }
+
+        LOGGER.info(String.format("Async stream load: db[%s] table[%s] 
rows[%d] bytes[%d] label[%s].",
+                flushData.getDatabase(), flushData.getTable(), 
flushData.getBatchCount(), flushData.getBatchSize(),
+                flushData.getLabel()));
+        long startWithRetries = System.nanoTime();
+        for (int i = 0; i <= sinkOptions.getSinkMaxRetries(); i++) {
+            try {
+                long start = System.nanoTime();
+                // flush to StarRocks with stream load
+                Map<String, Object> result = 
starrocksStreamLoadVisitor.doStreamLoad(flushData);
+                LOGGER.info(String.format("Async stream load finished: 
label[%s].", flushData.getLabel()));
+                // metrics
+                if (null != totalFlushBytes) {
+                    totalFlushBytes.inc(flushData.getBatchSize());
+                    totalFlushRows.inc(flushData.getBatchCount());
+                    totalFlushTime.inc(System.nanoTime() - startWithRetries);
+                    totalFlushTimeWithoutRetries.inc(System.nanoTime() - 
start);
+                    totalFlushSucceededTimes.inc();
+                    flushTimeNs.update(System.nanoTime() - start);
+                    updateMetricsFromStreamLoadResult(result);
+
+                    if (null != metricData) {
+                        metricData.invoke(flushData.getBatchCount(), 
flushData.getBatchSize());
+                    }
+                }
+                startScheduler();
+                break;
+            } catch (Exception e) {
+                if (totalFlushFailedTimes != null) {
+                    totalFlushFailedTimes.inc();
+                }
+                LOGGER.warn("Failed to flush batch data to StarRocks, retry 
times = {}", i, e);
+                if (i >= sinkOptions.getSinkMaxRetries()) {
+                    if (schemaUpdatePolicy == null
+                            || schemaUpdatePolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                        throw e;
+                    } else if (schemaUpdatePolicy == 
SchemaUpdateExceptionPolicy.STOP_PARTIAL) {
+                        ignoreWriteTables.add(tableIdentifier);
+                    }
+                }
+                if (e instanceof StarRocksStreamLoadFailedException
+                        && ((StarRocksStreamLoadFailedException) 
e).needReCreateLabel()) {
+                    String oldLabel = flushData.getLabel();
+                    flushData.reGenerateLabel();
+                    LOGGER.warn(String.format("Batch label changed from [%s] 
to [%s]", oldLabel, flushData.getLabel()));
+                }
+                try {
+                    Thread.sleep(1000L * Math.min(i + 1, 10));
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Unable to flush, interrupted while 
doing another attempt", e);
+                }
+            }
+        }
+        return true;
+    }
+
+    private void waitAsyncFlushingDone() throws InterruptedException {
+        // wait for previous flushings
+        offer(new StarRocksSinkBufferEntity(null, null, null));
+        offer(new StarRocksSinkBufferEntity(null, null, null));
+        checkFlushException();
+    }
+
+    void offer(StarRocksSinkBufferEntity bufferEntity) throws 
InterruptedException {
+        if (!flushThreadAlive) {
+            LOGGER.info(String.format("Flush thread already exit, ignore offer 
request for label[%s]",
+                    bufferEntity.getLabel()));
+            return;
+        }
+
+        long start = System.nanoTime();
+        if (!flushQueue.offer(bufferEntity, sinkOptions.getSinkOfferTimeout(), 
TimeUnit.MILLISECONDS)) {
+            throw new RuntimeException(
+                    "Timeout while offering data to flushQueue, exceed " + 
sinkOptions.getSinkOfferTimeout()
+                            + " ms, see " + 
StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT.key());
+        }
+        if (offerTimeNs != null) {
+            offerTimeNs.update(System.nanoTime() - start);
+        }
+    }
+
+    private void offerEOF() {
+        try {
+            offer(new StarRocksSinkBufferEntity(null, null, null).asEOF());
+        } catch (Exception e) {
+            LOGGER.warn("Writing EOF failed.", e);
+        }
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+            for (int i = 0; i < stack.length; i++) {
+                LOGGER.info(
+                        stack[i].getClassName() + "." + 
stack[i].getMethodName() + " line:" + stack[i].getLineNumber());
+            }
+            throw new RuntimeException("Writing records to StarRocks failed.", 
flushException);
+        }
+    }
+
+    private void validateTableStructure(TableSchema flinkSchema) {
+        if (null == flinkSchema) {
+            return;
+        }
+        Optional<UniqueConstraint> constraint = flinkSchema.getPrimaryKey();
+        List<Map<String, Object>> rows = 
starrocksQueryVisitor.getTableColumnsMetaData();
+        if (rows == null || rows.isEmpty()) {
+            throw new IllegalArgumentException("Couldn't get the sink table's 
column info.");
+        }
+        // validate primary keys
+        List<String> primayKeys = new ArrayList<>();
+        for (int i = 0; i < rows.size(); i++) {
+            String keysType = rows.get(i).get("COLUMN_KEY").toString();
+            if (!"PRI".equals(keysType)) {
+                continue;
+            }
+            
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
+        }
+        if (!primayKeys.isEmpty()) {
+            if (!constraint.isPresent()) {
+                throw new IllegalArgumentException("Primary keys not defined 
in the sink `TableSchema`.");
+            }
+            if (constraint.get().getColumns().size() != primayKeys.size() || 
!constraint.get().getColumns().stream()
+                    .allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
+                throw new IllegalArgumentException(
+                        "Primary keys of the flink `TableSchema` do not match 
with the ones from starrocks table.");
+            }
+            sinkOptions.enableUpsertDelete();
+        }
+
+        if (sinkOptions.hasColumnMappingProperty()) {
+            return;
+        }
+        if (flinkSchema.getFieldCount() != rows.size()) {
+            throw new IllegalArgumentException(
+                    "Fields count of " + this.sinkOptions.getTableName() + " 
mismatch. \nflinkSchema["
+                            + flinkSchema.getFieldNames().length + "]:"
+                            + 
Arrays.asList(flinkSchema.getFieldNames()).stream().collect(Collectors.joining(","))
+                            + "\n realTab[" + rows.size() + "]:"
+                            + rows.stream().map((r) -> 
String.valueOf(r.get("COLUMN_NAME")))
+                                    .collect(Collectors.joining(",")));
+        }
+        List<TableColumn> flinkCols = flinkSchema.getTableColumns();
+        for (int i = 0; i < rows.size(); i++) {
+            String starrocksField = 
rows.get(i).get("COLUMN_NAME").toString().toLowerCase();
+            String starrocksType = 
rows.get(i).get("DATA_TYPE").toString().toLowerCase();
+            List<TableColumn> matchedFlinkCols = flinkCols.stream()
+                    .filter(col -> 
col.getName().toLowerCase().equals(starrocksField)
+                            && (!typesMap.containsKey(starrocksType) || 
typesMap.get(starrocksType)
+                                    
.contains(col.getType().getLogicalType().getTypeRoot())))
+                    .collect(Collectors.toList());
+            if (matchedFlinkCols.isEmpty()) {
+                throw new IllegalArgumentException("Fields name or type 
mismatch for:" + starrocksField);
+            }
+        }
+    }
+
+    private void updateMetricsFromStreamLoadResult(Map<String, Object> result) 
{
+        if (result != null) {
+            updateHisto(result, "CommitAndPublishTimeMs", 
this.commitAndPublishTimeMs);
+            updateHisto(result, "StreamLoadPutTimeMs", 
this.streamLoadPutTimeMs);
+            updateHisto(result, "ReadDataTimeMs", this.readDataTimeMs);
+            updateHisto(result, "WriteDataTimeMs", this.writeDataTimeMs);
+            updateHisto(result, "LoadTimeMs", this.loadTimeMs);
+            updateCounter(result, "NumberFilteredRows", 
this.totalFilteredRows);
+        }
+    }
+
+    private void updateCounter(Map<String, Object> result, String key, Counter 
counter) {
+        if (result.containsKey(key)) {
+            Object val = result.get(key);
+            if (val != null) {
+                try {
+                    long longValue = Long.parseLong(val.toString());
+                    counter.inc(longValue);
+                } catch (Exception e) {
+                    LOGGER.warn("Parse stream load result metric error", e);
+                }
+            }
+        }
+    }
+
+    private void updateHisto(Map<String, Object> result, String key, Histogram 
histogram) {
+        if (result.containsKey(key)) {
+            Object val = result.get(key);
+            if (val != null) {
+                try {
+                    long longValue = Long.parseLong(val.toString());
+                    histogram.update(longValue);
+                } catch (Exception e) {
+                    LOGGER.warn("Parse stream load result metric error", e);
+                }
+            }
+        }
+    }
+
+    private static final Map<String, List<LogicalTypeRoot>> typesMap = new 
HashMap<>();
+
+    static {
+        // validate table structure
+        typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, 
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+        typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, 
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER,
+                LogicalTypeRoot.BINARY));
+        typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, 
LogicalTypeRoot.VARCHAR));
+        typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, 
LogicalTypeRoot.VARCHAR));
+        typesMap.put("datetime", 
Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
LogicalTypeRoot.VARCHAR));
+        typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, 
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER,
+                LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
+        typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, 
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
+        typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, 
LogicalTypeRoot.INTEGER));
+        typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, 
LogicalTypeRoot.BINARY));
+        typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, 
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY,
+                LogicalTypeRoot.BOOLEAN));
+        typesMap.put("smallint",
+                Arrays.asList(LogicalTypeRoot.SMALLINT, 
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+        typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, 
LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP,
+                LogicalTypeRoot.ROW));
+        typesMap.put("string",
+                Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, 
LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP,
+                        LogicalTypeRoot.ROW));
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
new file mode 100644
index 000000000..53ca016c3
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.manager;
+
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import 
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException;
+import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser;
+import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.shade.com.alibaba.fastjson.JSON;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksStreamLoadVisitor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
+
+    private static final int ERROR_LOG_MAX_LENGTH = 3000;
+
+    private final StarRocksSinkOptions sinkOptions;
+    private final String[] fieldNames;
+    private long pos;
+    private boolean opAutoProjectionInJson;
+    private static final String RESULT_FAILED = "Fail";
+    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
+    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
+    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
+    private static final String RESULT_LABEL_PREPARE = "PREPARE";
+    private static final String RESULT_LABEL_ABORTED = "ABORTED";
+    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
+
+    public StarRocksStreamLoadVisitor(StarRocksSinkOptions sinkOptions, 
String[] fieldNames,
+            boolean opAutoProjectionInJson) {
+        this.fieldNames = fieldNames;
+        this.sinkOptions = sinkOptions;
+        this.opAutoProjectionInJson = opAutoProjectionInJson;
+    }
+
+    public Map<String, Object> doStreamLoad(StarRocksSinkBufferEntity 
bufferEntity) throws IOException {
+        String host = getAvailableHost();
+        if (null == host) {
+            throw new IOException("None of the hosts in `load_url` could be 
connected.");
+        }
+        String loadUrl = new 
StringBuilder(host).append("/api/").append(bufferEntity.getDatabase()).append("/")
+                
.append(bufferEntity.getTable()).append("/_stream_load").toString();
+        LOG.info(String.format("Start to join batch data: label[%s].", 
bufferEntity.getLabel()));
+        Map<String, Object> loadResult = doHttpPut(loadUrl, 
bufferEntity.getLabel(),
+                joinRows(bufferEntity.getBuffer(), (int) 
bufferEntity.getBatchSize()));
+        final String keyStatus = "Status";
+        if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+            throw new IOException(
+                    "Unable to flush data to StarRocks: unknown result status, 
usually caused by: "
+                            + "1.authorization or permission related problems. 
"
+                            + "2.Wrong column_separator or row_delimiter. "
+                            + "3.Column count exceeded the limitation.");
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Stream Load response: \n%s\n", 
JSON.toJSONString(loadResult)));
+        }
+        if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
+            Map<String, String> logMap = new HashMap<>();
+            if (loadResult.containsKey("ErrorURL")) {
+                logMap.put("streamLoadErrorLog", getErrorLog((String) 
loadResult.get("ErrorURL")));
+            }
+            throw new StarRocksStreamLoadFailedException(
+                    String.format("Failed to flush data to StarRocks, Error " 
+ "response: \n%s\n%s\n",
+                            JSON.toJSONString(loadResult), 
JSON.toJSONString(logMap)),
+                    loadResult);
+        } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
+            LOG.error(String.format("Stream Load response: \n%s\n", 
JSON.toJSONString(loadResult)));
+            // has to block-checking the state to get the final result
+            checkLabelState(host, bufferEntity.getDatabase(), 
bufferEntity.getLabel());
+        }
+        return loadResult;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkLabelState(String host, String database, String label) 
throws IOException {
+        int idx = 0;
+        while (true) {
+            try {
+                TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
+            } catch (InterruptedException ex) {
+                break;
+            }
+            try (CloseableHttpClient httpclient = HttpClients.createDefault()) 
{
+                HttpGet httpGet = new HttpGet(
+                        new 
StringBuilder(host).append("/api/").append(database)
+                                
.append("/get_load_state?label=").append(label).toString());
+                httpGet.setHeader("Authorization",
+                        getBasicAuthHeader(sinkOptions.getUsername(), 
sinkOptions.getPassword()));
+                httpGet.setHeader("Connection", "close");
+
+                try (CloseableHttpResponse resp = httpclient.execute(httpGet)) 
{
+                    HttpEntity respEntity = getHttpEntity(resp);
+                    if (respEntity == null) {
+                        throw new 
StarRocksStreamLoadFailedException(String.format(
+                                "Failed to flush data to StarRocks, Error "
+                                        + "could not get the final state of 
label[%s].\n",
+                                label), null);
+                    }
+                    Map<String, Object> result = (Map<String, Object>) 
JSON.parse(EntityUtils.toString(respEntity));
+                    String labelState = (String) result.get("state");
+                    if (null == labelState) {
+                        throw new 
StarRocksStreamLoadFailedException(String.format(
+                                "Failed to flush data to StarRocks, Error "
+                                        + "could not get the final state of 
label[%s]. response[%s]\n",
+                                label,
+                                EntityUtils.toString(respEntity)), null);
+                    }
+                    LOG.info(String.format("Checking label[%s] state[%s]\n", 
label, labelState));
+                    switch (labelState) {
+                        case LAEBL_STATE_VISIBLE:
+                        case LAEBL_STATE_COMMITTED:
+                            return;
+                        case RESULT_LABEL_PREPARE:
+                            continue;
+                        case RESULT_LABEL_ABORTED:
+                            throw new StarRocksStreamLoadFailedException(
+                                    String.format("Failed to flush data to 
StarRocks, Error " + "label[%s] state[%s]\n",
+                                            label, labelState),
+                                    null, true);
+                        case RESULT_LABEL_UNKNOWN:
+                        default:
+                            throw new StarRocksStreamLoadFailedException(
+                                    String.format("Failed to flush data to 
StarRocks, Error " + "label[%s] state[%s]\n",
+                                            label, labelState),
+                                    null);
+                    }
+                }
+            }
+        }
+    }
+
+    private String getErrorLog(String errorUrl) {
+        if (errorUrl == null || errorUrl.isEmpty() || 
!errorUrl.startsWith("http")) {
+            return null;
+        }
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            HttpGet httpGet = new HttpGet(errorUrl);
+            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
+                HttpEntity respEntity = getHttpEntity(resp);
+                if (respEntity == null) {
+                    return null;
+                }
+                String errorLog = EntityUtils.toString(respEntity);
+                if (errorLog != null && errorLog.length() > 
ERROR_LOG_MAX_LENGTH) {
+                    errorLog = errorLog.substring(0, ERROR_LOG_MAX_LENGTH);
+                }
+                return errorLog;
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to get error log.", e);
+            return "Failed to get error log: " + e.getMessage();
+        }
+    }
+
+    private String getAvailableHost() {
+        List<String> hostList = sinkOptions.getLoadUrlList();
+        long tmp = pos + hostList.size();
+        for (; pos < tmp; pos++) {
+            String host = new 
StringBuilder("http://";).append(hostList.get((int) (pos % 
hostList.size()))).toString();
+            if (tryHttpConnection(host)) {
+                return host;
+            }
+        }
+        return null;
+    }
+
+    private boolean tryHttpConnection(String host) {
+        try {
+            URL url = new URL(host);
+            HttpURLConnection co = (HttpURLConnection) url.openConnection();
+            co.setConnectTimeout(sinkOptions.getConnectTimeout());
+            co.connect();
+            co.disconnect();
+            return true;
+        } catch (Exception e1) {
+            LOG.warn("Failed to connect to address:{}", host, e1);
+            return false;
+        }
+    }
+
+    private byte[] joinRows(List<byte[]> rows, int totalBytes) throws 
IOException {
+        if 
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
 {
+            byte[] lineDelimiter = StarRocksDelimiterParser.parse(
+                    
sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n")
+                    .getBytes(StandardCharsets.UTF_8);
+            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * 
lineDelimiter.length);
+            for (byte[] row : rows) {
+                bos.put(row);
+                bos.put(lineDelimiter);
+            }
+            return bos.array();
+        }
+
+        if 
(StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat()))
 {
+            ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() 
? 2 : rows.size() + 1));
+            bos.put("[".getBytes(StandardCharsets.UTF_8));
+            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+            boolean isFirstElement = true;
+            for (byte[] row : rows) {
+                if (!isFirstElement) {
+                    bos.put(jsonDelimiter);
+                }
+                bos.put(row);
+                isFirstElement = false;
+            }
+            bos.put("]".getBytes(StandardCharsets.UTF_8));
+            return bos.array();
+        }
+        throw new RuntimeException("Failed to join rows data, unsupported 
`format` from stream load properties:");
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] 
data) throws IOException {
+        LOG.info(String.format("Executing stream load to: '%s', size: '%s', 
thread: %d", loadUrl, data.length,
+                Thread.currentThread().getId()));
+        final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+                .setRedirectStrategy(new DefaultRedirectStrategy() {
+
+                    @Override
+                    protected boolean isRedirectable(String method) {
+                        return true;
+                    }
+                });
+        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+            HttpPut httpPut = new HttpPut(loadUrl);
+            Map<String, String> props = 
sinkOptions.getSinkStreamLoadProperties();
+            for (Map.Entry<String, String> entry : props.entrySet()) {
+                httpPut.setHeader(entry.getKey(), entry.getValue());
+            }
+            if (!props.containsKey("columns") && 
((sinkOptions.supportUpsertDelete() && !opAutoProjectionInJson)
+                    || 
StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())))
 {
+                String cols = String.join(",",
+                        Arrays.asList(fieldNames).stream().map(f -> 
String.format("`%s`", f.trim().replace("`", "")))
+                                .collect(Collectors.toList()));
+                if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
+                    cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY);
+                }
+                httpPut.setHeader("columns", cols);
+            }
+            if (!httpPut.containsHeader("timeout")) {
+                httpPut.setHeader("timeout", "60");
+            }
+            httpPut.setHeader("Expect", "100-continue");
+            httpPut.setHeader("label", label);
+            httpPut.setHeader("Authorization",
+                    getBasicAuthHeader(sinkOptions.getUsername(), 
sinkOptions.getPassword()));
+            httpPut.setEntity(new ByteArrayEntity(data));
+            
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+                HttpEntity respEntity = getHttpEntity(resp);
+                if (respEntity == null) {
+                    return null;
+                }
+                return (Map<String, Object>) 
JSON.parse(EntityUtils.toString(respEntity));
+            }
+        }
+    }
+
+    private String getBasicAuthHeader(String username, String password) {
+        String auth = username + ":" + password;
+        byte[] encodedAuth = 
Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+        return new StringBuilder("Basic ").append(new 
String(encodedAuth)).toString();
+    }
+
+    private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
+        int code = resp.getStatusLine().getStatusCode();
+        if (200 != code) {
+            LOG.warn("Request failed with code:{}", code);
+            return null;
+        }
+        HttpEntity respEntity = resp.getEntity();
+        if (null == respEntity) {
+            LOG.warn("Request failed with empty response.");
+            return null;
+        }
+        return respEntity;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
new file mode 100644
index 000000000..93a1363ff
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+import com.google.common.base.Strings;
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
+import 
com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
+import com.starrocks.connector.flink.manager.StarRocksQueryVisitor;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
+import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
+import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.shade.com.alibaba.fastjson.JSON;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.NestedRowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.starrocks.manager.StarRocksSinkManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> 
implements CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksDynamicSinkFunction.class);
+
+    private StarRocksSinkManager sinkManager;
+    private StarRocksIRowTransformer<T> rowTransformer;
+    private StarRocksSinkOptions sinkOptions;
+    private StarRocksISerializer serializer;
+    private transient Counter totalInvokeRowsTime;
+    private transient Counter totalInvokeRows;
+    private static final String COUNTER_INVOKE_ROWS_COST_TIME = 
"totalInvokeRowsTimeNs";
+    private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
+
+    /**
+     * state only works with `StarRocksSinkSemantic.EXACTLY_ONCE`
+     */
+    private transient ListState<Map<String, StarRocksSinkBufferEntity>> 
checkpointedState;
+
+    private final boolean multipleSink;
+    private final String sinkMultipleFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+
+    private final String inlongMetric;
+    private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+    private final String auditHostAndPorts;
+
+    private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
+
+    public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions,
+            TableSchema schema,
+            StarRocksIRowTransformer<T> rowTransformer,
+            boolean multipleSink,
+            String sinkMultipleFormat,
+            String databasePattern,
+            String tablePattern,
+            String inlongMetric,
+            String auditHostAndPorts,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+        StarRocksJdbcConnectionOptions jdbcOptions = new 
StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(),
+                sinkOptions.getUsername(), sinkOptions.getPassword());
+        StarRocksJdbcConnectionProvider jdbcConnProvider = new 
StarRocksJdbcConnectionProvider(jdbcOptions);
+        StarRocksQueryVisitor starrocksQueryVisitor = new 
StarRocksQueryVisitor(jdbcConnProvider,
+                sinkOptions.getDatabaseName(), sinkOptions.getTableName());
+        this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, 
jdbcConnProvider, starrocksQueryVisitor,
+                multipleSink, schemaUpdatePolicy);
+
+        
rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping());
+        rowTransformer.setTableSchema(schema);
+        this.serializer = 
StarRocksSerializerFactory.createSerializer(sinkOptions, 
schema.getFieldNames());
+        this.rowTransformer = rowTransformer;
+        this.sinkOptions = sinkOptions;
+
+        this.multipleSink = multipleSink;
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        sinkManager.setRuntimeContext(getRuntimeContext());
+        totalInvokeRows = 
getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS);
+        totalInvokeRowsTime = 
getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS_COST_TIME);
+        if (null != rowTransformer) {
+            rowTransformer.setRuntimeContext(getRuntimeContext());
+        }
+        sinkManager.startScheduler();
+        sinkManager.startAsyncFlushing();
+
+        MetricOption metricOption = 
MetricOption.builder().withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL).build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            sinkManager.setSinkMetricData(metricData);
+        }
+    }
+
+    @Override
+    public synchronized void invoke(T value, Context context) throws Exception 
{
+        long start = System.nanoTime();
+        if 
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            flushPreviousState();
+        }
+        if (null == serializer) {
+            if (value instanceof StarRocksSinkRowDataWithMeta) {
+                StarRocksSinkRowDataWithMeta data = 
(StarRocksSinkRowDataWithMeta) value;
+                if (Strings.isNullOrEmpty(data.getDatabase()) || 
Strings.isNullOrEmpty(data.getTable())
+                        || null == data.getDataRows()) {
+                    LOG.warn(String.format("json row data not fullfilled. 
{database: %s, table: %s, dataRows: %s}",
+                            data.getDatabase(), data.getTable(), 
data.getDataRows()));
+                    return;
+                }
+                sinkManager.writeRecords(data.getDatabase(), data.getTable(), 
data.getDataRows());
+                return;
+            }
+            // raw data sink
+            sinkManager.writeRecords(sinkOptions.getDatabaseName(), 
sinkOptions.getTableName(), (String) value);
+            totalInvokeRows.inc(1);
+            totalInvokeRowsTime.inc(System.nanoTime() - start);
+            return;
+        }
+        if (value instanceof NestedRowData) {
+            final int headerSize = 256;
+            NestedRowData ddlData = (NestedRowData) value;
+            if (ddlData.getSegments().length != 1 || 
ddlData.getSegments()[0].size() < headerSize) {
+                return;
+            }
+            int totalSize = ddlData.getSegments()[0].size();
+            byte[] data = new byte[totalSize - headerSize];
+            ddlData.getSegments()[0].get(headerSize, data);
+            Map<String, String> ddlMap = 
InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
+            if (null == ddlMap || "true".equals(ddlMap.get("snapshot")) || 
Strings.isNullOrEmpty(ddlMap.get("ddl"))
+                    || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
+                return;
+            }
+            Statement stmt = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
+            if (stmt instanceof Truncate) {
+                Truncate truncate = (Truncate) stmt;
+                if 
(!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
+                    return;
+                }
+                // TODO: add ddl to queue
+            } else if (stmt instanceof Alter) {
+                Alter alter = (Alter) stmt;
+            }
+        }
+        if (value instanceof RowData) {
+            if (RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
+                // do not need update_before, cauz an update action happened 
on the primary keys will be separated into
+                // `delete` and `create`
+                return;
+            }
+            if (!sinkOptions.supportUpsertDelete() && 
RowKind.DELETE.equals(((RowData) value).getRowKind())) {
+                // let go the UPDATE_AFTER and INSERT rows for tables who have 
a group of `unique` or `duplicate` keys.
+                return;
+            }
+        }
+
+        if (multipleSink) {
+            GenericRowData rowData = (GenericRowData) value;
+            if (jsonDynamicSchemaFormat == null) {
+                jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(
+                        this.sinkMultipleFormat);
+            }
+            JsonNode rootNode = jsonDynamicSchemaFormat.deserialize((byte[]) 
rowData.getField(0));
+            boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+            if (isDDL) {
+                // Ignore ddl change for now
+                return;
+            }
+            String databaseName = jsonDynamicSchemaFormat.parse(rootNode, 
databasePattern);
+            String tableName = jsonDynamicSchemaFormat.parse(rootNode, 
tablePattern);
+
+            List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
+                    jsonDynamicSchemaFormat.getOpType(rootNode));
+            List<Map<String, String>> physicalDataList = 
jsonDynamicSchemaFormat.jsonNode2Map(
+                    jsonDynamicSchemaFormat.getPhysicalData(rootNode));
+            JsonNode updateBeforeNode = 
jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+            List<Map<String, String>> updateBeforeList = null;
+            if (updateBeforeNode != null) {
+                updateBeforeList = 
jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
+            }
+            for (int i = 0; i < physicalDataList.size(); i++) {
+                for (RowKind rowKind : rowKinds) {
+                    String record = null;
+                    switch (rowKind) {
+                        case INSERT:
+                        case UPDATE_AFTER:
+                            physicalDataList.get(i).put("__op", 
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
+                            record = 
JSON.toJSONString(physicalDataList.get(i));
+                            break;
+                        case DELETE:
+                            physicalDataList.get(i).put("__op", 
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+                            record = 
JSON.toJSONString(physicalDataList.get(i));
+                            break;
+                        case UPDATE_BEFORE:
+                            if (updateBeforeList != null && 
updateBeforeList.size() > i) {
+                                updateBeforeList.get(i).put("__op", 
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+                                record = 
JSON.toJSONString(updateBeforeList.get(i));
+                            }
+                            break;
+                        default:
+                            throw new RuntimeException("Unrecognized row 
kind:" + rowKind);
+                    }
+                    if (StringUtils.isNotBlank(record)) {
+                        sinkManager.writeRecords(databaseName, tableName, 
record);
+                    }
+                }
+            }
+        } else {
+            String record = 
serializer.serialize(rowTransformer.transform(value, 
sinkOptions.supportUpsertDelete()));
+            sinkManager.writeRecords(sinkOptions.getDatabaseName(), 
sinkOptions.getTableName(), record);
+        }
+
+        totalInvokeRows.inc(1);
+        totalInvokeRowsTime.inc(System.nanoTime() - start);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (this.inlongMetric != null) {
+            this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(INLONG_METRIC_STATE_NAME, 
TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+
+        if 
(!StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            return;
+        }
+        ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> descriptor 
= new ListStateDescriptor<>(
+                "buffered-rows", TypeInformation.of(new TypeHint<Map<String, 
StarRocksSinkBufferEntity>>() {
+                }));
+        checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);
+    }
+
+    @Override
+    public synchronized void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+        if (metricData != null && metricStateListState != null) {
+            
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+
+        if 
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            flushPreviousState();
+            // save state
+            checkpointedState.add(sinkManager.getBufferedBatchMap());
+            return;
+        }
+        sinkManager.flush(null, true);
+    }
+
+    // @Override
+    public synchronized void finish() throws Exception {
+        // super.finish();
+        LOG.info("StarRocks sink is draining the remaining data.");
+        if 
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+            flushPreviousState();
+        }
+        sinkManager.flush(null, true);
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        super.close();
+        sinkManager.close();
+    }
+
+    private void flushPreviousState() throws Exception {
+        // flush the batch saved at the previous checkpoint
+        for (Map<String, StarRocksSinkBufferEntity> state : 
checkpointedState.get()) {
+            sinkManager.setBufferedBatchMap(state);
+            sinkManager.flush(null, true);
+        }
+        checkpointedState.clear();
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
new file mode 100644
index 000000000..630a49213
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+public class StarRocksDynamicTableSink implements DynamicTableSink {
+
+    private transient TableSchema flinkSchema;
+    private StarRocksSinkOptions sinkOptions;
+    private final boolean multipleSink;
+    private final String sinkMultipleFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+    public StarRocksDynamicTableSink(StarRocksSinkOptions sinkOptions,
+            TableSchema schema,
+            boolean multipleSink,
+            String sinkMultipleFormat,
+            String databasePattern,
+            String tablePattern,
+            String inlongMetric,
+            String auditHostAndPorts,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+        this.flinkSchema = schema;
+        this.sinkOptions = sinkOptions;
+        this.multipleSink = multipleSink;
+        this.sinkMultipleFormat = sinkMultipleFormat;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return requestedMode;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final TypeInformation<RowData> rowDataTypeInfo = 
context.createTypeInformation(flinkSchema.toRowDataType());
+        StarRocksDynamicSinkFunction<RowData> starrocksSinkFunction = new 
StarRocksDynamicSinkFunction<>(sinkOptions,
+                flinkSchema,
+                new StarRocksTableRowTransformer(rowDataTypeInfo),
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                inlongMetric,
+                auditHostAndPorts,
+                schemaUpdatePolicy);
+        return SinkFunctionProvider.of(starrocksSinkFunction, 
sinkOptions.getSinkParallelism());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new StarRocksDynamicTableSink(sinkOptions,
+                flinkSchema,
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                inlongMetric,
+                auditHostAndPorts,
+                schemaUpdatePolicy);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "starrocks_sink";
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
new file mode 100644
index 000000000..b3fc5ea5e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+public class StarRocksDynamicTableSinkFactory implements 
DynamicTableSinkFactory {
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX);
+        ReadableConfig options = helper.getOptions();
+        // validate some special properties
+        StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options, 
context.getCatalogTable().getOptions());
+        sinkOptions.enableUpsertDelete();
+        TableSchema physicalSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+        String sinkMultipleFormat = 
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
+        String databasePattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
+        String tablePattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
+        SchemaUpdateExceptionPolicy schemaUpdatePolicy = 
helper.getOptions().get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        String inlongMetric = 
helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+        String auditHostAndPorts = 
helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+
+        validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern);
+
+        return new StarRocksDynamicTableSink(sinkOptions,
+                physicalSchema,
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                inlongMetric,
+                auditHostAndPorts,
+                schemaUpdatePolicy);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "starrocks-inlong";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(StarRocksSinkOptions.JDBC_URL);
+        requiredOptions.add(StarRocksSinkOptions.LOAD_URL);
+        requiredOptions.add(StarRocksSinkOptions.DATABASE_NAME);
+        requiredOptions.add(StarRocksSinkOptions.TABLE_NAME);
+        requiredOptions.add(StarRocksSinkOptions.USERNAME);
+        requiredOptions.add(StarRocksSinkOptions.PASSWORD);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE);
+        optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_MAX_ROWS);
+        optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL);
+        optionalOptions.add(StarRocksSinkOptions.SINK_MAX_RETRIES);
+        optionalOptions.add(StarRocksSinkOptions.SINK_SEMANTIC);
+        optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT);
+        optionalOptions.add(StarRocksSinkOptions.SINK_PARALLELISM);
+        optionalOptions.add(StarRocksSinkOptions.SINK_LABEL_PREFIX);
+        optionalOptions.add(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT);
+        optionalOptions.add(SINK_MULTIPLE_FORMAT);
+        optionalOptions.add(SINK_MULTIPLE_DATABASE_PATTERN);
+        optionalOptions.add(SINK_MULTIPLE_TABLE_PATTERN);
+        optionalOptions.add(SINK_MULTIPLE_ENABLE);
+        optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        optionalOptions.add(INLONG_METRIC);
+        optionalOptions.add(INLONG_AUDIT);
+        return optionalOptions;
+    }
+
+    private void validateSinkMultiple(DataType physicalDataType, boolean 
multipleSink, String sinkMultipleFormat,
+            String databasePattern, String tablePattern) {
+        if (multipleSink) {
+            if (StringUtils.isBlank(databasePattern)) {
+                throw new ValidationException("The option 
'sink.multiple.database-pattern'"
+                        + " is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            }
+            if (StringUtils.isBlank(tablePattern)) {
+                throw new ValidationException("The option 
'sink.multiple.table-pattern' "
+                        + "is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            }
+            if (StringUtils.isBlank(sinkMultipleFormat)) {
+                throw new ValidationException("The option 
'sink.multiple.format' "
+                        + "is not allowed blank when the option 
'sink.multiple.enable' is 'true'");
+            }
+            DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+            Set<String> supportFormats = 
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
+            if (!supportFormats.contains(sinkMultipleFormat)) {
+                throw new ValidationException(
+                        String.format("Unsupported value '%s' for '%s'. " + 
"Supported values are %s.",
+                                sinkMultipleFormat, 
SINK_MULTIPLE_FORMAT.key(), supportFormats));
+            }
+            if (physicalDataType.getLogicalType() instanceof VarBinaryType) {
+                throw new ValidationException("Only supports 'BYTES' or 
'VARBINARY(n)' of PhysicalDataType "
+                        + "when the option 'sink.multiple.enable' is 'true'");
+            }
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..fe39d4a26
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.starrocks.table.sink.StarRocksDynamicTableSinkFactory
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index cf629fe6a..507561321 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -233,6 +233,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-starrocks</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 5a0d591b5..b268d8223 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -612,6 +612,15 @@
     Source  : org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 (Please 
note that the software have been modified.)
     License : 
https://github.com/apache/doris-flink-connector/blob/1.13_2.11-1.0.3/LICENSE.txt
 
+1.3.13 
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+       
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
+       
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+       
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
+       
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+
+  Source  : com.starrocks:flink-connector-starrocks:1.2.3_flink-1.13_2.11 
(Please note that the software have been modified.)
+  License : https://www.apache.org/licenses/LICENSE-2.0.txt
+
 
 =======================================================================
 Apache InLong Subcomponents:


Reply via email to