This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee0ead08 [INLONG-6747][Sort] StarRocks connector supports
transferring all tables for all schemas in one database (#6748)
5ee0ead08 is described below
commit 5ee0ead0825ce45663f71b8e03a81281be6c9511
Author: Liao Rui <[email protected]>
AuthorDate: Fri Dec 9 17:57:51 2022 +0800
[INLONG-6747][Sort] StarRocks connector supports transferring all tables
for all schemas in one database (#6748)
Co-authored-by: ryanrliao <[email protected]>
---
.../src/main/assemblies/sort-connectors.xml | 8 +
.../{StarRocksConstant.java => Constant.java} | 49 +-
.../sort/protocol/constant/StarRocksConstant.java | 18 -
.../apache/inlong/sort/protocol/node/LoadNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
.../sort/protocol/node/load/StarRocksLoadNode.java | 11 +-
.../base/sink/SchemaUpdateExceptionPolicy.java | 1 +
inlong-sort/sort-connectors/pom.xml | 1 +
inlong-sort/sort-connectors/starrocks/pom.xml | 83 +++
.../starrocks/manager/StarRocksSinkManager.java | 596 +++++++++++++++++++++
.../manager/StarRocksStreamLoadVisitor.java | 322 +++++++++++
.../table/sink/StarRocksDynamicSinkFunction.java | 342 ++++++++++++
.../table/sink/StarRocksDynamicTableSink.java | 101 ++++
.../sink/StarRocksDynamicTableSinkFactory.java | 149 ++++++
.../org.apache.flink.table.factories.Factory | 16 +
inlong-sort/sort-core/pom.xml | 6 +
licenses/inlong-sort-connectors/LICENSE | 9 +
17 files changed, 1648 insertions(+), 72 deletions(-)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors.xml
b/inlong-distribution/src/main/assemblies/sort-connectors.xml
index 83a4eda7a..0f7c2f01e 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors.xml
@@ -163,6 +163,14 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-connectors/starrocks/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-starrocks-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
<!-- module's 3td-licenses, notices-->
<fileSet>
<directory>../licenses/inlong-sort-connectors</directory>
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
similarity index 53%
copy from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
index 8813a4e92..b425cfea6 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
@@ -18,54 +18,9 @@
package org.apache.inlong.sort.protocol.constant;
/**
- * StarRocks options constant
+ * common options constant
*/
-public class StarRocksConstant {
-
- /**
- * 'connector' = 'starrocks-inlong'
- */
- public static final String CONNECTOR = "connector";
-
- /**
- * Host of the stream load like:
`jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...`.
- */
- public static final String JDBC_URL = "jdbc-url";
-
- /**
- * Host of the stream load like:
`fe_ip1:http_port;fe_ip2:http_port;fe_ip3:http_port`.
- */
- public static final String LOAD_URL = "load-url";
-
- /**
- * StarRocks user name.
- */
- public static final String USERNAME = "username";
-
- /**
- * StarRocks user password.
- */
- public static final String PASSWORD = "password";
-
- /**
- * StarRocks stream load format, support json and csv.
- */
- public static final String FORMAT = "sink.properties.format";
-
- /**
- * StarRocks stream load strip outer array for json format.
- */
- public static final String STRIP_OUTER_ARRAY =
"sink.properties.strip_outer_array";
-
- /**
- * Database name of the stream load.
- */
- public static final String DATABASE_NAME = "database-name";
-
- /**
- * Table name of the stream load.
- */
- public static final String TABLE_NAME = "table-name";
+public class Constant {
/**
* The multiple enable of sink
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
index 8813a4e92..cfc716dc3 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/StarRocksConstant.java
@@ -67,22 +67,4 @@ public class StarRocksConstant {
*/
public static final String TABLE_NAME = "table-name";
- /**
- * The multiple enable of sink
- */
- public static final String SINK_MULTIPLE_ENABLE = "sink.multiple.enable";
-
- /**
- * The multiple format of sink
- */
- public static final String SINK_MULTIPLE_FORMAT = "sink.multiple.format";
-
- /**
- * The multiple database-pattern of sink
- */
- public static final String SINK_MULTIPLE_DATABASE_PATTERN =
"sink.multiple.database-pattern";
- /**
- * The multiple table-pattern of sink
- */
- public static final String SINK_MULTIPLE_TABLE_PATTERN =
"sink.multiple.table-pattern";
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index ad3a19baa..72cc42931 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -42,6 +42,7 @@ import
org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -69,7 +70,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name =
"greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name =
"dlcIcebergLoad"),
- @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad")
+ @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+ @JsonSubTypes.Type(value = StarRocksLoadNode.class, name =
"starRocksLoad")
})
@NoArgsConstructor
@Data
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index dc1f6cf27..7f37075e0 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -47,6 +47,7 @@ import
org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
@@ -88,7 +89,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name =
"greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name =
"dlcIcebergLoad"),
- @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad")
+ @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+ @JsonSubTypes.Type(value = StarRocksLoadNode.class, name =
"starRocksLoad"),
})
public interface Node {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
index 7b7e1455c..1c64c7d44 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/StarRocksLoadNode.java
@@ -34,6 +34,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.constant.Constant;
import org.apache.inlong.sort.protocol.constant.StarRocksConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -145,13 +146,13 @@ public class StarRocksLoadNode extends LoadNode
implements InlongMetric, Seriali
options.put(StarRocksConstant.DATABASE_NAME, databaseName);
options.put(StarRocksConstant.TABLE_NAME, tableName);
if (sinkMultipleEnable != null && sinkMultipleEnable) {
- options.put(StarRocksConstant.SINK_MULTIPLE_ENABLE,
sinkMultipleEnable.toString());
- options.put(StarRocksConstant.SINK_MULTIPLE_FORMAT,
+ options.put(Constant.SINK_MULTIPLE_ENABLE,
sinkMultipleEnable.toString());
+ options.put(Constant.SINK_MULTIPLE_FORMAT,
Objects.requireNonNull(sinkMultipleFormat).identifier());
- options.put(StarRocksConstant.SINK_MULTIPLE_DATABASE_PATTERN,
databasePattern);
- options.put(StarRocksConstant.SINK_MULTIPLE_TABLE_PATTERN,
tablePattern);
+ options.put(Constant.SINK_MULTIPLE_DATABASE_PATTERN,
databasePattern);
+ options.put(Constant.SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
} else {
- options.put(StarRocksConstant.SINK_MULTIPLE_ENABLE, "false");
+ options.put(Constant.SINK_MULTIPLE_ENABLE, "false");
}
options.put(StarRocksConstant.FORMAT, "json");
options.put(StarRocksConstant.STRIP_OUTER_ARRAY, "true");
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
index 82295591d..7a48eef85 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
@@ -33,6 +33,7 @@ public enum SchemaUpdateExceptionPolicy {
TRY_IT_BEST("Try it best to handle schema update, if can not handle it,
just ignore it."),
LOG_WITH_IGNORE("Ignore schema update and log it."),
ALERT_WITH_IGNORE("Ignore schema update and alert it."),
+ STOP_PARTIAL("Only stop abnormal sink table, other tables writes
normally."),
THROW_WITH_STOP("Throw exception to stop flink job when meet schema
update.");
private String description;
diff --git a/inlong-sort/sort-connectors/pom.xml
b/inlong-sort/sort-connectors/pom.xml
index e40cb5a8a..801a401a9 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -54,6 +54,7 @@
<module>tubemq</module>
<module>filesystem</module>
<module>doris</module>
+ <module>starrocks</module>
<module>hudi</module>
</modules>
diff --git a/inlong-sort/sort-connectors/starrocks/pom.xml
b/inlong-sort/sort-connectors/starrocks/pom.xml
new file mode 100644
index 000000000..558a2054f
--- /dev/null
+++ b/inlong-sort/sort-connectors/starrocks/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>sort-connector-starrocks</artifactId>
+
+ <name>Apache InLong - Sort-connector-starrocks</name>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <starrocks-connector.version>1.2.3</starrocks-connector.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.starrocks</groupId>
+ <artifactId>flink-connector-starrocks</artifactId>
+
<version>${starrocks-connector.version}_flink-${flink.minor.version}_${flink.scala.binary.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.starrocks.shaded.org.apache.inlong.sort.base</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
new file mode 100644
index 000000000..a6952eb6d
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.manager;
+
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
+import
com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
+import com.starrocks.connector.flink.manager.StarRocksQueryVisitor;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StarRocks sink manager which caches and flushes data.
+ */
+public class StarRocksSinkManager implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksSinkManager.class);
+
+ private final StarRocksJdbcConnectionProvider jdbcConnProvider;
+ private final StarRocksQueryVisitor starrocksQueryVisitor;
+ private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+ private final StarRocksSinkOptions sinkOptions;
+ final LinkedBlockingDeque<StarRocksSinkBufferEntity> flushQueue = new
LinkedBlockingDeque<>(1);
+
+ private transient Counter totalFlushBytes;
+ private transient Counter totalFlushRows;
+ private transient Counter totalFlushTime;
+ private transient Counter totalFlushTimeWithoutRetries;
+ private transient Counter totalFlushSucceededTimes;
+ private transient Counter totalFlushFailedTimes;
+ private transient Histogram flushTimeNs;
+ private transient Histogram offerTimeNs;
+
+ private transient Counter totalFilteredRows;
+ private transient Histogram commitAndPublishTimeMs;
+ private transient Histogram streamLoadPutTimeMs;
+ private transient Histogram readDataTimeMs;
+ private transient Histogram writeDataTimeMs;
+ private transient Histogram loadTimeMs;
+
+ private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
+ private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
+ private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES
= "totalFlushTimeNsWithoutRetries";
+ private static final String COUNTER_TOTAL_FLUSH_COST_TIME =
"totalFlushTimeNs";
+ private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES =
"totalFlushSucceededTimes";
+ private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES =
"totalFlushFailedTimes";
+ private static final String HISTOGRAM_FLUSH_TIME = "flushTimeNs";
+ private static final String HISTOGRAM_OFFER_TIME_NS = "offerTimeNs";
+
+ // from stream load result
+ private static final String COUNTER_NUMBER_FILTERED_ROWS =
"totalFilteredRows";
+ private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS =
"commitAndPublishTimeMs";
+ private static final String HISTOGRAM_STREAM_LOAD_PUT_TIME_MS =
"streamLoadPutTimeMs";
+ private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
+ private static final String HISTOGRAM_WRITE_DATA_TIME_MS =
"writeDataTimeMs";
+ private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
+
+ private final Map<String, StarRocksSinkBufferEntity> bufferMap = new
ConcurrentHashMap<>();
+ private static final long FLUSH_QUEUE_POLL_TIMEOUT = 3000;
+ private volatile boolean closed = false;
+ private volatile boolean flushThreadAlive = false;
+ private volatile Throwable flushException;
+
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture<?> scheduledFuture;
+
+ private final boolean multipleSink;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private transient SinkMetricData metricData;
+
+ /**
+ * If a table writing throws exception, ignore it when receiving data
later again
+ */
+ private Set<String> ignoreWriteTables = new HashSet<>();
+
+ public void setSinkMetricData(SinkMetricData metricData) {
+ this.metricData = metricData;
+ }
+
+ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions,
+ TableSchema flinkSchema,
+ boolean multipleSink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ this.sinkOptions = sinkOptions;
+ StarRocksJdbcConnectionOptions jdbcOptions = new
StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(),
+ sinkOptions.getUsername(), sinkOptions.getPassword());
+ this.jdbcConnProvider = new
StarRocksJdbcConnectionProvider(jdbcOptions);
+ this.starrocksQueryVisitor = new
StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(),
+ sinkOptions.getTableName());
+
+ this.multipleSink = multipleSink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+
+ init(flinkSchema);
+ }
+
+ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions,
+ TableSchema flinkSchema,
+ StarRocksJdbcConnectionProvider jdbcConnProvider,
+ StarRocksQueryVisitor starrocksQueryVisitor,
+ boolean multipleSink,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ this.sinkOptions = sinkOptions;
+ this.jdbcConnProvider = jdbcConnProvider;
+ this.starrocksQueryVisitor = starrocksQueryVisitor;
+
+ this.multipleSink = multipleSink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+
+ init(flinkSchema);
+ }
+
+ protected void init(TableSchema schema) {
+ if (!multipleSink) {
+ validateTableStructure(schema);
+ }
+ String version = starrocksQueryVisitor.getStarRocksVersion();
+ this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(
+ sinkOptions,
+ null == schema ? new String[]{} : schema.getFieldNames(),
+ version.length() > 0 && !version.trim().startsWith("1."));
+ }
+
+ public void setRuntimeContext(RuntimeContext runtimeCtx) {
+ totalFlushBytes =
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
+ totalFlushRows =
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
+ totalFlushTime =
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
+ totalFlushTimeWithoutRetries = runtimeCtx.getMetricGroup()
+ .counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
+ totalFlushSucceededTimes =
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
+ totalFlushFailedTimes =
runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
+ flushTimeNs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ offerTimeNs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+
+ totalFilteredRows =
runtimeCtx.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
+ commitAndPublishTimeMs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ streamLoadPutTimeMs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PUT_TIME_MS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ readDataTimeMs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ writeDataTimeMs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ loadTimeMs =
runtimeCtx.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS,
+ new
DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
+ }
+
+ public void startAsyncFlushing() {
+ // start flush thread
+ Thread flushThread = new Thread(() -> {
+ while (true) {
+ try {
+ if (!asyncFlush()) {
+ LOGGER.info("StarRocks flush thread is about to
exit.");
+ flushThreadAlive = false;
+ break;
+ }
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ });
+
+ flushThread.setUncaughtExceptionHandler((t, e) -> {
+ LOGGER.error("StarRocks flush thread uncaught exception occurred:
" + e.getMessage(), e);
+ flushException = e;
+ flushThreadAlive = false;
+ });
+ flushThread.setName("starrocks-flush");
+ flushThread.setDaemon(true);
+ flushThread.start();
+ flushThreadAlive = true;
+ }
+
+ public void startScheduler() throws IOException {
+ if
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ return;
+ }
+ stopScheduler();
+ this.scheduler = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("starrocks-interval-sink"));
+ this.scheduledFuture = this.scheduler.schedule(() -> {
+ synchronized (StarRocksSinkManager.this) {
+ if (!closed) {
+ try {
+ LOGGER.info("StarRocks interval Sinking triggered.");
+ if (bufferMap.isEmpty()) {
+ startScheduler();
+ }
+ flush(null, false);
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ }, sinkOptions.getSinkMaxFlushInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler() {
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ }
+
+ public final synchronized void writeRecords(String database, String table,
String... records) throws IOException {
+ checkFlushException();
+ try {
+ if (0 == records.length) {
+ return;
+ }
+ String bufferKey = String.format("%s,%s", database, table);
+ StarRocksSinkBufferEntity bufferEntity =
bufferMap.computeIfAbsent(bufferKey,
+ k -> new StarRocksSinkBufferEntity(database, table,
sinkOptions.getLabelPrefix()));
+ for (String record : records) {
+ byte[] bts = record.getBytes(StandardCharsets.UTF_8);
+ bufferEntity.addToBuffer(bts);
+ }
+ if
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ return;
+ }
+ if (bufferEntity.getBatchCount() >= sinkOptions.getSinkMaxRows()
+ || bufferEntity.getBatchSize() >=
sinkOptions.getSinkMaxBytes()) {
+ LOGGER.info(
+ String.format("StarRocks buffer Sinking triggered: db:
[%s] table: [%s] rows[%d] label[%s].",
+ database, table, bufferEntity.getBatchCount(),
bufferEntity.getLabel()));
+ flush(bufferKey, false);
+ }
+ } catch (Exception e) {
+ throw new IOException("Writing records to StarRocks failed.", e);
+ }
+ }
+
+ public synchronized void flush(String bufferKey, boolean waitUtilDone)
throws Exception {
+ if (bufferMap.isEmpty()) {
+ flushInternal(null, waitUtilDone);
+ return;
+ }
+ if (null == bufferKey) {
+ for (String key : bufferMap.keySet()) {
+ flushInternal(key, waitUtilDone);
+ }
+ return;
+ }
+ flushInternal(bufferKey, waitUtilDone);
+ }
+
+ private synchronized void flushInternal(String bufferKey, boolean
waitUtilDone) throws Exception {
+ checkFlushException();
+ if (null == bufferKey || bufferMap.isEmpty() ||
!bufferMap.containsKey(bufferKey)) {
+ if (waitUtilDone) {
+ waitAsyncFlushingDone();
+ }
+ return;
+ }
+ offer(bufferMap.get(bufferKey));
+ bufferMap.remove(bufferKey);
+ if (waitUtilDone) {
+ // wait the last flush
+ waitAsyncFlushingDone();
+ }
+ }
+
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ LOGGER.info("StarRocks Sink is about to close.");
+ this.bufferMap.clear();
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+ if (jdbcConnProvider != null) {
+ jdbcConnProvider.close();
+ }
+
+ offerEOF();
+ }
+ checkFlushException();
+ }
+
+ public Map<String, StarRocksSinkBufferEntity> getBufferedBatchMap() {
+ Map<String, StarRocksSinkBufferEntity> clone = new HashMap<>();
+ clone.putAll(bufferMap);
+ return clone;
+ }
+
+ public void setBufferedBatchMap(Map<String, StarRocksSinkBufferEntity>
bufferMap) throws IOException {
+ if
(!StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ return;
+ }
+ this.bufferMap.clear();
+ this.bufferMap.putAll(bufferMap);
+ }
+
+ /**
+ * async flush
+ *
+ * @return false if met eof and flush thread will exit.
+ */
+ private boolean asyncFlush() throws Exception {
+ StarRocksSinkBufferEntity flushData =
flushQueue.poll(FLUSH_QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (flushData == null || (0 == flushData.getBatchCount() &&
!flushData.EOF())) {
+ return true;
+ }
+ if (flushData.EOF()) {
+ return false;
+ }
+ stopScheduler();
+
+ String tableIdentifier = flushData.getDatabase() + "." +
flushData.getTable();
+
+ if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == schemaUpdatePolicy &&
ignoreWriteTables.contains(
+ tableIdentifier)) {
+ LOGGER.warn(
+ String.format("Stop writing to db[%s] table[%s] because of
former errors and stop_partial policy",
+ flushData.getDatabase(), flushData.getTable()));
+ return true;
+ }
+
+ LOGGER.info(String.format("Async stream load: db[%s] table[%s]
rows[%d] bytes[%d] label[%s].",
+ flushData.getDatabase(), flushData.getTable(),
flushData.getBatchCount(), flushData.getBatchSize(),
+ flushData.getLabel()));
+ long startWithRetries = System.nanoTime();
+ for (int i = 0; i <= sinkOptions.getSinkMaxRetries(); i++) {
+ try {
+ long start = System.nanoTime();
+ // flush to StarRocks with stream load
+ Map<String, Object> result =
starrocksStreamLoadVisitor.doStreamLoad(flushData);
+ LOGGER.info(String.format("Async stream load finished:
label[%s].", flushData.getLabel()));
+ // metrics
+ if (null != totalFlushBytes) {
+ totalFlushBytes.inc(flushData.getBatchSize());
+ totalFlushRows.inc(flushData.getBatchCount());
+ totalFlushTime.inc(System.nanoTime() - startWithRetries);
+ totalFlushTimeWithoutRetries.inc(System.nanoTime() -
start);
+ totalFlushSucceededTimes.inc();
+ flushTimeNs.update(System.nanoTime() - start);
+ updateMetricsFromStreamLoadResult(result);
+
+ if (null != metricData) {
+ metricData.invoke(flushData.getBatchCount(),
flushData.getBatchSize());
+ }
+ }
+ startScheduler();
+ break;
+ } catch (Exception e) {
+ if (totalFlushFailedTimes != null) {
+ totalFlushFailedTimes.inc();
+ }
+ LOGGER.warn("Failed to flush batch data to StarRocks, retry
times = {}", i, e);
+ if (i >= sinkOptions.getSinkMaxRetries()) {
+ if (schemaUpdatePolicy == null
+ || schemaUpdatePolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+ throw e;
+ } else if (schemaUpdatePolicy ==
SchemaUpdateExceptionPolicy.STOP_PARTIAL) {
+ ignoreWriteTables.add(tableIdentifier);
+ }
+ }
+ if (e instanceof StarRocksStreamLoadFailedException
+ && ((StarRocksStreamLoadFailedException)
e).needReCreateLabel()) {
+ String oldLabel = flushData.getLabel();
+ flushData.reGenerateLabel();
+ LOGGER.warn(String.format("Batch label changed from [%s]
to [%s]", oldLabel, flushData.getLabel()));
+ }
+ try {
+ Thread.sleep(1000L * Math.min(i + 1, 10));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Unable to flush, interrupted while
doing another attempt", e);
+ }
+ }
+ }
+ return true;
+ }
+
+ private void waitAsyncFlushingDone() throws InterruptedException {
+ // wait for previous flushings
+ offer(new StarRocksSinkBufferEntity(null, null, null));
+ offer(new StarRocksSinkBufferEntity(null, null, null));
+ checkFlushException();
+ }
+
+ void offer(StarRocksSinkBufferEntity bufferEntity) throws
InterruptedException {
+ if (!flushThreadAlive) {
+ LOGGER.info(String.format("Flush thread already exit, ignore offer
request for label[%s]",
+ bufferEntity.getLabel()));
+ return;
+ }
+
+ long start = System.nanoTime();
+ if (!flushQueue.offer(bufferEntity, sinkOptions.getSinkOfferTimeout(),
TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException(
+ "Timeout while offering data to flushQueue, exceed " +
sinkOptions.getSinkOfferTimeout()
+ + " ms, see " +
StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT.key());
+ }
+ if (offerTimeNs != null) {
+ offerTimeNs.update(System.nanoTime() - start);
+ }
+ }
+
+ private void offerEOF() {
+ try {
+ offer(new StarRocksSinkBufferEntity(null, null, null).asEOF());
+ } catch (Exception e) {
+ LOGGER.warn("Writing EOF failed.", e);
+ }
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (int i = 0; i < stack.length; i++) {
+ LOGGER.info(
+ stack[i].getClassName() + "." +
stack[i].getMethodName() + " line:" + stack[i].getLineNumber());
+ }
+ throw new RuntimeException("Writing records to StarRocks failed.",
flushException);
+ }
+ }
+
+ private void validateTableStructure(TableSchema flinkSchema) {
+ if (null == flinkSchema) {
+ return;
+ }
+ Optional<UniqueConstraint> constraint = flinkSchema.getPrimaryKey();
+ List<Map<String, Object>> rows =
starrocksQueryVisitor.getTableColumnsMetaData();
+ if (rows == null || rows.isEmpty()) {
+ throw new IllegalArgumentException("Couldn't get the sink table's
column info.");
+ }
+ // validate primary keys
+ List<String> primayKeys = new ArrayList<>();
+ for (int i = 0; i < rows.size(); i++) {
+ String keysType = rows.get(i).get("COLUMN_KEY").toString();
+ if (!"PRI".equals(keysType)) {
+ continue;
+ }
+
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
+ }
+ if (!primayKeys.isEmpty()) {
+ if (!constraint.isPresent()) {
+ throw new IllegalArgumentException("Primary keys not defined
in the sink `TableSchema`.");
+ }
+ if (constraint.get().getColumns().size() != primayKeys.size() ||
!constraint.get().getColumns().stream()
+ .allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
+ throw new IllegalArgumentException(
+ "Primary keys of the flink `TableSchema` do not match
with the ones from starrocks table.");
+ }
+ sinkOptions.enableUpsertDelete();
+ }
+
+ if (sinkOptions.hasColumnMappingProperty()) {
+ return;
+ }
+ if (flinkSchema.getFieldCount() != rows.size()) {
+ throw new IllegalArgumentException(
+ "Fields count of " + this.sinkOptions.getTableName() + "
mismatch. \nflinkSchema["
+ + flinkSchema.getFieldNames().length + "]:"
+ +
Arrays.asList(flinkSchema.getFieldNames()).stream().collect(Collectors.joining(","))
+ + "\n realTab[" + rows.size() + "]:"
+ + rows.stream().map((r) ->
String.valueOf(r.get("COLUMN_NAME")))
+ .collect(Collectors.joining(",")));
+ }
+ List<TableColumn> flinkCols = flinkSchema.getTableColumns();
+ for (int i = 0; i < rows.size(); i++) {
+ String starrocksField =
rows.get(i).get("COLUMN_NAME").toString().toLowerCase();
+ String starrocksType =
rows.get(i).get("DATA_TYPE").toString().toLowerCase();
+ List<TableColumn> matchedFlinkCols = flinkCols.stream()
+ .filter(col ->
col.getName().toLowerCase().equals(starrocksField)
+ && (!typesMap.containsKey(starrocksType) ||
typesMap.get(starrocksType)
+
.contains(col.getType().getLogicalType().getTypeRoot())))
+ .collect(Collectors.toList());
+ if (matchedFlinkCols.isEmpty()) {
+ throw new IllegalArgumentException("Fields name or type
mismatch for:" + starrocksField);
+ }
+ }
+ }
+
+ private void updateMetricsFromStreamLoadResult(Map<String, Object> result)
{
+ if (result != null) {
+ updateHisto(result, "CommitAndPublishTimeMs",
this.commitAndPublishTimeMs);
+ updateHisto(result, "StreamLoadPutTimeMs",
this.streamLoadPutTimeMs);
+ updateHisto(result, "ReadDataTimeMs", this.readDataTimeMs);
+ updateHisto(result, "WriteDataTimeMs", this.writeDataTimeMs);
+ updateHisto(result, "LoadTimeMs", this.loadTimeMs);
+ updateCounter(result, "NumberFilteredRows",
this.totalFilteredRows);
+ }
+ }
+
+ private void updateCounter(Map<String, Object> result, String key, Counter
counter) {
+ if (result.containsKey(key)) {
+ Object val = result.get(key);
+ if (val != null) {
+ try {
+ long longValue = Long.parseLong(val.toString());
+ counter.inc(longValue);
+ } catch (Exception e) {
+ LOGGER.warn("Parse stream load result metric error", e);
+ }
+ }
+ }
+ }
+
+ private void updateHisto(Map<String, Object> result, String key, Histogram
histogram) {
+ if (result.containsKey(key)) {
+ Object val = result.get(key);
+ if (val != null) {
+ try {
+ long longValue = Long.parseLong(val.toString());
+ histogram.update(longValue);
+ } catch (Exception e) {
+ LOGGER.warn("Parse stream load result metric error", e);
+ }
+ }
+ }
+ }
+
+ private static final Map<String, List<LogicalTypeRoot>> typesMap = new
HashMap<>();
+
+ static {
+ // validate table structure
+ typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT,
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BINARY));
+ typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR));
+ typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE,
LogicalTypeRoot.VARCHAR));
+ typesMap.put("datetime",
Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.VARCHAR));
+ typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
+ typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
+ typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT,
LogicalTypeRoot.INTEGER));
+ typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BINARY));
+ typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT,
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY,
+ LogicalTypeRoot.BOOLEAN));
+ typesMap.put("smallint",
+ Arrays.asList(LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
+ typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ROW));
+ typesMap.put("string",
+ Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ROW));
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
new file mode 100644
index 000000000..53ca016c3
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.manager;
+
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException;
+import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser;
+import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.shade.com.alibaba.fastjson.JSON;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksStreamLoadVisitor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
+
+ private static final int ERROR_LOG_MAX_LENGTH = 3000;
+
+ private final StarRocksSinkOptions sinkOptions;
+ private final String[] fieldNames;
+ private long pos;
+ private boolean opAutoProjectionInJson;
+ private static final String RESULT_FAILED = "Fail";
+ private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
+ private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
+ private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
+ private static final String RESULT_LABEL_PREPARE = "PREPARE";
+ private static final String RESULT_LABEL_ABORTED = "ABORTED";
+ private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
+
+ public StarRocksStreamLoadVisitor(StarRocksSinkOptions sinkOptions,
String[] fieldNames,
+ boolean opAutoProjectionInJson) {
+ this.fieldNames = fieldNames;
+ this.sinkOptions = sinkOptions;
+ this.opAutoProjectionInJson = opAutoProjectionInJson;
+ }
+
+ public Map<String, Object> doStreamLoad(StarRocksSinkBufferEntity
bufferEntity) throws IOException {
+ String host = getAvailableHost();
+ if (null == host) {
+ throw new IOException("None of the hosts in `load_url` could be
connected.");
+ }
+ String loadUrl = new
StringBuilder(host).append("/api/").append(bufferEntity.getDatabase()).append("/")
+
.append(bufferEntity.getTable()).append("/_stream_load").toString();
+ LOG.info(String.format("Start to join batch data: label[%s].",
bufferEntity.getLabel()));
+ Map<String, Object> loadResult = doHttpPut(loadUrl,
bufferEntity.getLabel(),
+ joinRows(bufferEntity.getBuffer(), (int)
bufferEntity.getBatchSize()));
+ final String keyStatus = "Status";
+ if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+ throw new IOException(
+ "Unable to flush data to StarRocks: unknown result status,
usually caused by: "
+ + "1.authorization or permission related problems.
"
+ + "2.Wrong column_separator or row_delimiter. "
+ + "3.Column count exceeded the limitation.");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Stream Load response: \n%s\n",
JSON.toJSONString(loadResult)));
+ }
+ if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
+ Map<String, String> logMap = new HashMap<>();
+ if (loadResult.containsKey("ErrorURL")) {
+ logMap.put("streamLoadErrorLog", getErrorLog((String)
loadResult.get("ErrorURL")));
+ }
+ throw new StarRocksStreamLoadFailedException(
+ String.format("Failed to flush data to StarRocks, Error "
+ "response: \n%s\n%s\n",
+ JSON.toJSONString(loadResult),
JSON.toJSONString(logMap)),
+ loadResult);
+ } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
+ LOG.error(String.format("Stream Load response: \n%s\n",
JSON.toJSONString(loadResult)));
+ // has to block-checking the state to get the final result
+ checkLabelState(host, bufferEntity.getDatabase(),
bufferEntity.getLabel());
+ }
+ return loadResult;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkLabelState(String host, String database, String label)
throws IOException {
+ int idx = 0;
+ while (true) {
+ try {
+ TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
+ } catch (InterruptedException ex) {
+ break;
+ }
+ try (CloseableHttpClient httpclient = HttpClients.createDefault())
{
+ HttpGet httpGet = new HttpGet(
+ new
StringBuilder(host).append("/api/").append(database)
+
.append("/get_load_state?label=").append(label).toString());
+ httpGet.setHeader("Authorization",
+ getBasicAuthHeader(sinkOptions.getUsername(),
sinkOptions.getPassword()));
+ httpGet.setHeader("Connection", "close");
+
+ try (CloseableHttpResponse resp = httpclient.execute(httpGet))
{
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (respEntity == null) {
+ throw new
StarRocksStreamLoadFailedException(String.format(
+ "Failed to flush data to StarRocks, Error "
+ + "could not get the final state of
label[%s].\n",
+ label), null);
+ }
+ Map<String, Object> result = (Map<String, Object>)
JSON.parse(EntityUtils.toString(respEntity));
+ String labelState = (String) result.get("state");
+ if (null == labelState) {
+ throw new
StarRocksStreamLoadFailedException(String.format(
+ "Failed to flush data to StarRocks, Error "
+ + "could not get the final state of
label[%s]. response[%s]\n",
+ label,
+ EntityUtils.toString(respEntity)), null);
+ }
+ LOG.info(String.format("Checking label[%s] state[%s]\n",
label, labelState));
+ switch (labelState) {
+ case LAEBL_STATE_VISIBLE:
+ case LAEBL_STATE_COMMITTED:
+ return;
+ case RESULT_LABEL_PREPARE:
+ continue;
+ case RESULT_LABEL_ABORTED:
+ throw new StarRocksStreamLoadFailedException(
+ String.format("Failed to flush data to
StarRocks, Error " + "label[%s] state[%s]\n",
+ label, labelState),
+ null, true);
+ case RESULT_LABEL_UNKNOWN:
+ default:
+ throw new StarRocksStreamLoadFailedException(
+ String.format("Failed to flush data to
StarRocks, Error " + "label[%s] state[%s]\n",
+ label, labelState),
+ null);
+ }
+ }
+ }
+ }
+ }
+
+ private String getErrorLog(String errorUrl) {
+ if (errorUrl == null || errorUrl.isEmpty() ||
!errorUrl.startsWith("http")) {
+ return null;
+ }
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ HttpGet httpGet = new HttpGet(errorUrl);
+ try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (respEntity == null) {
+ return null;
+ }
+ String errorLog = EntityUtils.toString(respEntity);
+ if (errorLog != null && errorLog.length() >
ERROR_LOG_MAX_LENGTH) {
+ errorLog = errorLog.substring(0, ERROR_LOG_MAX_LENGTH);
+ }
+ return errorLog;
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get error log.", e);
+ return "Failed to get error log: " + e.getMessage();
+ }
+ }
+
+ private String getAvailableHost() {
+ List<String> hostList = sinkOptions.getLoadUrlList();
+ long tmp = pos + hostList.size();
+ for (; pos < tmp; pos++) {
+ String host = new
StringBuilder("http://").append(hostList.get((int) (pos %
hostList.size()))).toString();
+ if (tryHttpConnection(host)) {
+ return host;
+ }
+ }
+ return null;
+ }
+
+ private boolean tryHttpConnection(String host) {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(sinkOptions.getConnectTimeout());
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e1) {
+ LOG.warn("Failed to connect to address:{}", host, e1);
+ return false;
+ }
+ }
+
+ private byte[] joinRows(List<byte[]> rows, int totalBytes) throws
IOException {
+ if
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
{
+ byte[] lineDelimiter = StarRocksDelimiterParser.parse(
+
sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n")
+ .getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() *
lineDelimiter.length);
+ for (byte[] row : rows) {
+ bos.put(row);
+ bos.put(lineDelimiter);
+ }
+ return bos.array();
+ }
+
+ if
(StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat()))
{
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty()
? 2 : rows.size() + 1));
+ bos.put("[".getBytes(StandardCharsets.UTF_8));
+ byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+ boolean isFirstElement = true;
+ for (byte[] row : rows) {
+ if (!isFirstElement) {
+ bos.put(jsonDelimiter);
+ }
+ bos.put(row);
+ isFirstElement = false;
+ }
+ bos.put("]".getBytes(StandardCharsets.UTF_8));
+ return bos.array();
+ }
+ throw new RuntimeException("Failed to join rows data, unsupported
`format` from stream load properties:");
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> doHttpPut(String loadUrl, String label, byte[]
data) throws IOException {
+ LOG.info(String.format("Executing stream load to: '%s', size: '%s',
thread: %d", loadUrl, data.length,
+ Thread.currentThread().getId()));
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ });
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ HttpPut httpPut = new HttpPut(loadUrl);
+ Map<String, String> props =
sinkOptions.getSinkStreamLoadProperties();
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ httpPut.setHeader(entry.getKey(), entry.getValue());
+ }
+ if (!props.containsKey("columns") &&
((sinkOptions.supportUpsertDelete() && !opAutoProjectionInJson)
+ ||
StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())))
{
+ String cols = String.join(",",
+ Arrays.asList(fieldNames).stream().map(f ->
String.format("`%s`", f.trim().replace("`", "")))
+ .collect(Collectors.toList()));
+ if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
+ cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY);
+ }
+ httpPut.setHeader("columns", cols);
+ }
+ if (!httpPut.containsHeader("timeout")) {
+ httpPut.setHeader("timeout", "60");
+ }
+ httpPut.setHeader("Expect", "100-continue");
+ httpPut.setHeader("label", label);
+ httpPut.setHeader("Authorization",
+ getBasicAuthHeader(sinkOptions.getUsername(),
sinkOptions.getPassword()));
+ httpPut.setEntity(new ByteArrayEntity(data));
+
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (respEntity == null) {
+ return null;
+ }
+ return (Map<String, Object>)
JSON.parse(EntityUtils.toString(respEntity));
+ }
+ }
+ }
+
+ private String getBasicAuthHeader(String username, String password) {
+ String auth = username + ":" + password;
+ byte[] encodedAuth =
Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+ return new StringBuilder("Basic ").append(new
String(encodedAuth)).toString();
+ }
+
+ private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
+ int code = resp.getStatusLine().getStatusCode();
+ if (200 != code) {
+ LOG.warn("Request failed with code:{}", code);
+ return null;
+ }
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ LOG.warn("Request failed with empty response.");
+ return null;
+ }
+ return respEntity;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
new file mode 100644
index 000000000..93a1363ff
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+import com.google.common.base.Strings;
+import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
+import
com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
+import com.starrocks.connector.flink.manager.StarRocksQueryVisitor;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
+import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
+import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.shade.com.alibaba.fastjson.JSON;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.NestedRowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.starrocks.manager.StarRocksSinkManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(StarRocksDynamicSinkFunction.class);
+
+ private StarRocksSinkManager sinkManager;
+ private StarRocksIRowTransformer<T> rowTransformer;
+ private StarRocksSinkOptions sinkOptions;
+ private StarRocksISerializer serializer;
+ private transient Counter totalInvokeRowsTime;
+ private transient Counter totalInvokeRows;
+ private static final String COUNTER_INVOKE_ROWS_COST_TIME =
"totalInvokeRowsTimeNs";
+ private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
+
+ /**
+ * state only works with `StarRocksSinkSemantic.EXACTLY_ONCE`
+ */
+ private transient ListState<Map<String, StarRocksSinkBufferEntity>>
checkpointedState;
+
+ private final boolean multipleSink;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+
+ private final String inlongMetric;
+ private transient SinkMetricData metricData;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+ private final String auditHostAndPorts;
+
+ private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
+
+ public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions,
+ TableSchema schema,
+ StarRocksIRowTransformer<T> rowTransformer,
+ boolean multipleSink,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ String inlongMetric,
+ String auditHostAndPorts,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ StarRocksJdbcConnectionOptions jdbcOptions = new
StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(),
+ sinkOptions.getUsername(), sinkOptions.getPassword());
+ StarRocksJdbcConnectionProvider jdbcConnProvider = new
StarRocksJdbcConnectionProvider(jdbcOptions);
+ StarRocksQueryVisitor starrocksQueryVisitor = new
StarRocksQueryVisitor(jdbcConnProvider,
+ sinkOptions.getDatabaseName(), sinkOptions.getTableName());
+ this.sinkManager = new StarRocksSinkManager(sinkOptions, schema,
jdbcConnProvider, starrocksQueryVisitor,
+ multipleSink, schemaUpdatePolicy);
+
+
rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping());
+ rowTransformer.setTableSchema(schema);
+ this.serializer =
StarRocksSerializerFactory.createSerializer(sinkOptions,
schema.getFieldNames());
+ this.rowTransformer = rowTransformer;
+ this.sinkOptions = sinkOptions;
+
+ this.multipleSink = multipleSink;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ sinkManager.setRuntimeContext(getRuntimeContext());
+ totalInvokeRows =
getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS);
+ totalInvokeRowsTime =
getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS_COST_TIME);
+ if (null != rowTransformer) {
+ rowTransformer.setRuntimeContext(getRuntimeContext());
+ }
+ sinkManager.startScheduler();
+ sinkManager.startAsyncFlushing();
+
+ MetricOption metricOption =
MetricOption.builder().withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL).build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ sinkManager.setSinkMetricData(metricData);
+ }
+ }
+
+ @Override
+ public synchronized void invoke(T value, Context context) throws Exception
{
+ long start = System.nanoTime();
+ if
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ flushPreviousState();
+ }
+ if (null == serializer) {
+ if (value instanceof StarRocksSinkRowDataWithMeta) {
+ StarRocksSinkRowDataWithMeta data =
(StarRocksSinkRowDataWithMeta) value;
+ if (Strings.isNullOrEmpty(data.getDatabase()) ||
Strings.isNullOrEmpty(data.getTable())
+ || null == data.getDataRows()) {
+ LOG.warn(String.format("json row data not fullfilled.
{database: %s, table: %s, dataRows: %s}",
+ data.getDatabase(), data.getTable(),
data.getDataRows()));
+ return;
+ }
+ sinkManager.writeRecords(data.getDatabase(), data.getTable(),
data.getDataRows());
+ return;
+ }
+ // raw data sink
+ sinkManager.writeRecords(sinkOptions.getDatabaseName(),
sinkOptions.getTableName(), (String) value);
+ totalInvokeRows.inc(1);
+ totalInvokeRowsTime.inc(System.nanoTime() - start);
+ return;
+ }
+ if (value instanceof NestedRowData) {
+ final int headerSize = 256;
+ NestedRowData ddlData = (NestedRowData) value;
+ if (ddlData.getSegments().length != 1 ||
ddlData.getSegments()[0].size() < headerSize) {
+ return;
+ }
+ int totalSize = ddlData.getSegments()[0].size();
+ byte[] data = new byte[totalSize - headerSize];
+ ddlData.getSegments()[0].get(headerSize, data);
+ Map<String, String> ddlMap =
InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
+ if (null == ddlMap || "true".equals(ddlMap.get("snapshot")) ||
Strings.isNullOrEmpty(ddlMap.get("ddl"))
+ || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
+ return;
+ }
+ Statement stmt = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
+ if (stmt instanceof Truncate) {
+ Truncate truncate = (Truncate) stmt;
+ if
(!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
+ return;
+ }
+ // TODO: add ddl to queue
+ } else if (stmt instanceof Alter) {
+ Alter alter = (Alter) stmt;
+ }
+ }
+ if (value instanceof RowData) {
+ if (RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
+ // do not need update_before, cauz an update action happened
on the primary keys will be separated into
+ // `delete` and `create`
+ return;
+ }
+ if (!sinkOptions.supportUpsertDelete() &&
RowKind.DELETE.equals(((RowData) value).getRowKind())) {
+ // let go the UPDATE_AFTER and INSERT rows for tables who have
a group of `unique` or `duplicate` keys.
+ return;
+ }
+ }
+
+ if (multipleSink) {
+ GenericRowData rowData = (GenericRowData) value;
+ if (jsonDynamicSchemaFormat == null) {
+ jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(
+ this.sinkMultipleFormat);
+ }
+ JsonNode rootNode = jsonDynamicSchemaFormat.deserialize((byte[])
rowData.getField(0));
+ boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+ if (isDDL) {
+ // Ignore ddl change for now
+ return;
+ }
+ String databaseName = jsonDynamicSchemaFormat.parse(rootNode,
databasePattern);
+ String tableName = jsonDynamicSchemaFormat.parse(rootNode,
tablePattern);
+
+ List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
+ jsonDynamicSchemaFormat.getOpType(rootNode));
+ List<Map<String, String>> physicalDataList =
jsonDynamicSchemaFormat.jsonNode2Map(
+ jsonDynamicSchemaFormat.getPhysicalData(rootNode));
+ JsonNode updateBeforeNode =
jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+ List<Map<String, String>> updateBeforeList = null;
+ if (updateBeforeNode != null) {
+ updateBeforeList =
jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
+ }
+ for (int i = 0; i < physicalDataList.size(); i++) {
+ for (RowKind rowKind : rowKinds) {
+ String record = null;
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ physicalDataList.get(i).put("__op",
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
+ record =
JSON.toJSONString(physicalDataList.get(i));
+ break;
+ case DELETE:
+ physicalDataList.get(i).put("__op",
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+ record =
JSON.toJSONString(physicalDataList.get(i));
+ break;
+ case UPDATE_BEFORE:
+ if (updateBeforeList != null &&
updateBeforeList.size() > i) {
+ updateBeforeList.get(i).put("__op",
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+ record =
JSON.toJSONString(updateBeforeList.get(i));
+ }
+ break;
+ default:
+ throw new RuntimeException("Unrecognized row
kind:" + rowKind);
+ }
+ if (StringUtils.isNotBlank(record)) {
+ sinkManager.writeRecords(databaseName, tableName,
record);
+ }
+ }
+ }
+ } else {
+ String record =
serializer.serialize(rowTransformer.transform(value,
sinkOptions.supportUpsertDelete()));
+ sinkManager.writeRecords(sinkOptions.getDatabaseName(),
sinkOptions.getTableName(), record);
+ }
+
+ totalInvokeRows.inc(1);
+ totalInvokeRowsTime.inc(System.nanoTime() - start);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(INLONG_METRIC_STATE_NAME,
TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState =
MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ if
(!StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ return;
+ }
+ ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> descriptor
= new ListStateDescriptor<>(
+ "buffered-rows", TypeInformation.of(new TypeHint<Map<String,
StarRocksSinkBufferEntity>>() {
+ }));
+ checkpointedState =
context.getOperatorStateStore().getListState(descriptor);
+ }
+
+ @Override
+ public synchronized void snapshotState(FunctionSnapshotContext context)
throws Exception {
+ if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState,
metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ if
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ flushPreviousState();
+ // save state
+ checkpointedState.add(sinkManager.getBufferedBatchMap());
+ return;
+ }
+ sinkManager.flush(null, true);
+ }
+
+ // @Override
+ public synchronized void finish() throws Exception {
+ // super.finish();
+ LOG.info("StarRocks sink is draining the remaining data.");
+ if
(StarRocksSinkSemantic.EXACTLY_ONCE.equals(sinkOptions.getSemantic())) {
+ flushPreviousState();
+ }
+ sinkManager.flush(null, true);
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ super.close();
+ sinkManager.close();
+ }
+
+ private void flushPreviousState() throws Exception {
+ // flush the batch saved at the previous checkpoint
+ for (Map<String, StarRocksSinkBufferEntity> state :
checkpointedState.get()) {
+ sinkManager.setBufferedBatchMap(state);
+ sinkManager.flush(null, true);
+ }
+ checkpointedState.clear();
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
new file mode 100644
index 000000000..630a49213
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+public class StarRocksDynamicTableSink implements DynamicTableSink {
+
+ private transient TableSchema flinkSchema;
+ private StarRocksSinkOptions sinkOptions;
+ private final boolean multipleSink;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+
+ public StarRocksDynamicTableSink(StarRocksSinkOptions sinkOptions,
+ TableSchema schema,
+ boolean multipleSink,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ String inlongMetric,
+ String auditHostAndPorts,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ this.flinkSchema = schema;
+ this.sinkOptions = sinkOptions;
+ this.multipleSink = multipleSink;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return requestedMode;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(flinkSchema.toRowDataType());
+ StarRocksDynamicSinkFunction<RowData> starrocksSinkFunction = new
StarRocksDynamicSinkFunction<>(sinkOptions,
+ flinkSchema,
+ new StarRocksTableRowTransformer(rowDataTypeInfo),
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ inlongMetric,
+ auditHostAndPorts,
+ schemaUpdatePolicy);
+ return SinkFunctionProvider.of(starrocksSinkFunction,
sinkOptions.getSinkParallelism());
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new StarRocksDynamicTableSink(sinkOptions,
+ flinkSchema,
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ inlongMetric,
+ auditHostAndPorts,
+ schemaUpdatePolicy);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "starrocks_sink";
+ }
+}
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
new file mode 100644
index 000000000..b3fc5ea5e
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
+public class StarRocksDynamicTableSinkFactory implements
DynamicTableSinkFactory {
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX);
+ ReadableConfig options = helper.getOptions();
+ // validate some special properties
+ StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options,
context.getCatalogTable().getOptions());
+ sinkOptions.enableUpsertDelete();
+ TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+ String sinkMultipleFormat =
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
+ String databasePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
+ String tablePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy =
helper.getOptions().get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ String inlongMetric =
helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+ String auditHostAndPorts =
helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+
+ validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern);
+
+ return new StarRocksDynamicTableSink(sinkOptions,
+ physicalSchema,
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ inlongMetric,
+ auditHostAndPorts,
+ schemaUpdatePolicy);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "starrocks-inlong";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(StarRocksSinkOptions.JDBC_URL);
+ requiredOptions.add(StarRocksSinkOptions.LOAD_URL);
+ requiredOptions.add(StarRocksSinkOptions.DATABASE_NAME);
+ requiredOptions.add(StarRocksSinkOptions.TABLE_NAME);
+ requiredOptions.add(StarRocksSinkOptions.USERNAME);
+ requiredOptions.add(StarRocksSinkOptions.PASSWORD);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+ optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE);
+ optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_MAX_ROWS);
+ optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL);
+ optionalOptions.add(StarRocksSinkOptions.SINK_MAX_RETRIES);
+ optionalOptions.add(StarRocksSinkOptions.SINK_SEMANTIC);
+ optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT);
+ optionalOptions.add(StarRocksSinkOptions.SINK_PARALLELISM);
+ optionalOptions.add(StarRocksSinkOptions.SINK_LABEL_PREFIX);
+ optionalOptions.add(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT);
+ optionalOptions.add(SINK_MULTIPLE_FORMAT);
+ optionalOptions.add(SINK_MULTIPLE_DATABASE_PATTERN);
+ optionalOptions.add(SINK_MULTIPLE_TABLE_PATTERN);
+ optionalOptions.add(SINK_MULTIPLE_ENABLE);
+ optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ optionalOptions.add(INLONG_METRIC);
+ optionalOptions.add(INLONG_AUDIT);
+ return optionalOptions;
+ }
+
+ private void validateSinkMultiple(DataType physicalDataType, boolean
multipleSink, String sinkMultipleFormat,
+ String databasePattern, String tablePattern) {
+ if (multipleSink) {
+ if (StringUtils.isBlank(databasePattern)) {
+ throw new ValidationException("The option
'sink.multiple.database-pattern'"
+ + " is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ }
+ if (StringUtils.isBlank(tablePattern)) {
+ throw new ValidationException("The option
'sink.multiple.table-pattern' "
+ + "is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ }
+ if (StringUtils.isBlank(sinkMultipleFormat)) {
+ throw new ValidationException("The option
'sink.multiple.format' "
+ + "is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ }
+ DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ Set<String> supportFormats =
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
+ if (!supportFormats.contains(sinkMultipleFormat)) {
+ throw new ValidationException(
+ String.format("Unsupported value '%s' for '%s'. " +
"Supported values are %s.",
+ sinkMultipleFormat,
SINK_MULTIPLE_FORMAT.key(), supportFormats));
+ }
+ if (physicalDataType.getLogicalType() instanceof VarBinaryType) {
+ throw new ValidationException("Only supports 'BYTES' or
'VARBINARY(n)' of PhysicalDataType "
+ + "when the option 'sink.multiple.enable' is 'true'");
+ }
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..fe39d4a26
--- /dev/null
+++
b/inlong-sort/sort-connectors/starrocks/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.starrocks.table.sink.StarRocksDynamicTableSinkFactory
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index cf629fe6a..507561321 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -233,6 +233,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-starrocks</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 5a0d591b5..b268d8223 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -612,6 +612,15 @@
Source : org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 (Please
note that the software have been modified.)
License :
https://github.com/apache/doris-flink-connector/blob/1.13_2.11-1.0.3/LICENSE.txt
+1.3.13
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksStreamLoadVisitor.java
+
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSink.java
+
inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+
+ Source : com.starrocks:flink-connector-starrocks:1.2.3_flink-1.13_2.11
(Please note that the software have been modified.)
+ License : https://www.apache.org/licenses/LICENSE-2.0.txt
+
=======================================================================
Apache InLong Subcomponents: