This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e6caf705 [Feature][Connector-V2] Starrocks sink connector (#3164)
3e6caf705 is described below
commit 3e6caf7053b3675766e201eb3b69b35a2833c304
Author: Bibo <[email protected]>
AuthorDate: Mon Nov 7 19:14:48 2022 +0800
[Feature][Connector-V2] Starrocks sink connector (#3164)
* [Feature][Connector-V2] Add starrocks connector sink
* [Feature][Connector-V2] StarRocks sink connector
(StarRocks stream load API)
* improve StarRocksFlushTuple
* add Changelog
* delete useless log4j file
* change sr e2e port
---
docs/en/connector-v2/sink/StarRocks.md | 128 +++++++++
plugin-mapping.properties | 1 +
.../connector-starrocks}/pom.xml | 48 ++--
.../seatunnel/starrocks/client/HttpHelper.java | 163 +++++++++++
.../starrocks/client/StarRocksFlushTuple.java | 33 +++
.../starrocks/client/StarRocksSinkManager.java | 157 +++++++++++
.../client/StarRocksStreamLoadFailedException.java | 49 ++++
.../client/StarRocksStreamLoadVisitor.java | 225 +++++++++++++++
.../seatunnel/starrocks/config/SinkConfig.java | 139 ++++++++++
.../serialize/StarRocksBaseSerializer.java | 76 ++++++
.../serialize/StarRocksCsvSerializer.java | 47 ++++
.../serialize/StarRocksDelimiterParser.java | 73 +++++
.../starrocks/serialize/StarRocksISerializer.java | 27 ++
.../serialize/StarRocksJsonSerializer.java | 49 ++++
.../seatunnel/starrocks/sink/StarRocksSink.java | 76 ++++++
.../starrocks/sink/StarRocksSinkWriter.java | 89 ++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../{ => connector-starrocks-e2e}/pom.xml | 28 +-
.../e2e/connector/starrocks/StarRocksIT.java | 303 +++++++++++++++++++++
.../resources/starrocks-jdbc-to-starrocks.conf | 47 ++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
22 files changed, 1721 insertions(+), 45 deletions(-)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
new file mode 100644
index 000000000..2131cdc0d
--- /dev/null
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -0,0 +1,128 @@
+# StarRocks
+
+> StarRocks sink connector
+
+## Description
+Used to send data to StarRocks. Both support streaming and batch mode.
+The internal implementation of StarRocks sink connector is cached and imported
by stream load in batches.
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required |
default value |
+|-----------------------------|------------------------------|----------|-----------------|
+| node_urls | list | yes | -
|
+| username | string | yes | -
|
+| password | string | yes | -
|
+| database | string | yes | -
|
+| table | string | no | -
|
+| labelPrefix | string | no | -
|
+| batch_max_rows | long | no | 1024
|
+| batch_max_bytes | int | no | 5 *
1024 * 1024 |
+| batch_interval_ms | int | no | -
|
+| max_retries | int | no | -
|
+| retry_backoff_multiplier_ms | int | no | -
|
+| max_retry_backoff_ms | int | no | -
|
+| sink.properties.* | starrocks stream load config | no | -
|
+
+### node_urls [list]
+
+`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
+
+### username [string]
+
+`StarRocks` user username
+
+### password [string]
+
+`StarRocks` user password
+
+### database [string]
+
+The name of StarRocks database
+
+### table [string]
+
+The name of StarRocks table
+
+### labelPrefix [string]
+
+the prefix of StarRocks stream load label
+
+### batch_max_rows [string]
+
+For batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks
+
+### batch_max_bytes [string]
+
+For batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks
+
+### batch_interval_ms [string]
+
+For batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks
+
+### max_retries [string]
+
+The number of retries to flush failed
+
+### retry_backoff_multiplier_ms [string]
+
+Using as a multiplier for generating the next delay for backoff
+
+### max_retry_backoff_ms [string]
+
+The amount of time to wait before attempting to retry a request to `StarRocks`
+
+### sink.properties.* [starrocks stream load config]
+
+the parameter of the stream load `data_desc`
+The way to specify the parameter is to add the prefix `sink.properties.` to
the original stream load parameter name.
+For example, the way to specify `strip_outer_array` is:
`sink.properties.strip_outer_array`.
+
+#### Supported import data formats
+
+The supported formats include CSV and JSON. Default value: CSV
+
+## Example
+Use JSON format to import data
+```
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "e2e_table_sink"
+ batch_max_rows = 10
+ sink.properties.format = "JSON"
+ sink.properties.strip_outer_array = true
+ }
+}
+
+```
+
+Use CSV format to import data
+```
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "e2e_table_sink"
+ batch_max_rows = 10
+ sink.properties.format = "CSV"
+ sink.properties.column_separator = "\\x01",
+ sink.properties.row_delimiter = "\\x02"
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add StarRocks Sink Connector
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1555c1c14..11d7fd4a9 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -139,3 +139,4 @@ seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
+seatunnel.sink.StarRocks = connector-starrocks
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-connectors-v2/connector-starrocks/pom.xml
similarity index 57%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-connectors-v2/connector-starrocks/pom.xml
index ced7386dd..f7992271b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -1,60 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
+
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+
http://www.apache.org/licenses/LICENSE-2.0
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
+
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
- <artifactId>seatunnel-e2e</artifactId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <modules>
- <module>connector-assert-e2e</module>
- <module>connector-jdbc-e2e</module>
- <module>connector-redis-e2e</module>
- <module>connector-clickhouse-e2e</module>
- <module>connector-influxdb-e2e</module>
- <module>connector-amazondynamodb-e2e</module>
- <module>connector-file-local-e2e</module>
- </modules>
- <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <artifactId>connector-starrocks</artifactId>
+
+ <properties>
+ <httpclient.version>4.5.13</httpclient.version>
+ <httpcore.version>4.4.4</httpcore.version>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e-common</artifactId>
+ <artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-starter</artifactId>
+ <artifactId>connector-common</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-starter</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
</dependency>
</dependencies>
-
</project>
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
new file mode 100644
index 000000000..1d2ad3fba
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class HttpHelper {
+ private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
+
+ public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
+ int code = resp.getStatusLine().getStatusCode();
+ if (HttpStatus.SC_OK != code) {
+ log.warn("Request failed with code:{}", code);
+ return null;
+ }
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ log.warn("Request failed with empty response.");
+ return null;
+ }
+ return respEntity;
+ }
+
+ public String doHttpGet(String getUrl) throws IOException {
+ log.info("Executing GET from {}.", getUrl);
+ try (CloseableHttpClient httpclient = buildHttpClient()) {
+ HttpGet httpGet = new HttpGet(getUrl);
+ try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ log.warn("Request failed with empty response.");
+ return null;
+ }
+ return EntityUtils.toString(respEntity);
+ }
+ }
+ }
+
+ public Map<String, Object> doHttpGet(String getUrl, Map<String, String>
header) throws IOException {
+ log.info("Executing GET from {}.", getUrl);
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ HttpGet httpGet = new HttpGet(getUrl);
+ if (null != header) {
+ for (Map.Entry<String, String> entry : header.entrySet()) {
+ httpGet.setHeader(entry.getKey(),
String.valueOf(entry.getValue()));
+ }
+ }
+ try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
+ HttpEntity respEntity = getHttpEntity(resp);
+ if (null == respEntity) {
+ log.warn("Request failed with empty response.");
+ return null;
+ }
+ return JsonUtils.parseObject(EntityUtils.toString(respEntity),
Map.class);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> doHttpPut(String url, byte[] data, Map<String,
String> header) throws IOException {
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ });
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ HttpPut httpPut = new HttpPut(url);
+ if (null != header) {
+ for (Map.Entry<String, String> entry : header.entrySet()) {
+ httpPut.setHeader(entry.getKey(),
String.valueOf(entry.getValue()));
+ }
+ }
+ httpPut.setEntity(new ByteArrayEntity(data));
+
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+ int code = resp.getStatusLine().getStatusCode();
+ if (HttpStatus.SC_OK != code) {
+ String errorText;
+ try {
+ HttpEntity respEntity = resp.getEntity();
+ errorText = EntityUtils.toString(respEntity);
+ } catch (Exception err) {
+ errorText = "find errorText failed: " +
err.getMessage();
+ }
+ log.warn("Request failed with code:{}, err:{}", code,
errorText);
+ Map<String, Object> errorMap = new HashMap<>();
+ errorMap.put("Status", "Fail");
+ errorMap.put("Message", errorText);
+ return errorMap;
+ }
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ log.warn("Request failed with empty response.");
+ return null;
+ }
+ return JsonUtils.parseObject(EntityUtils.toString(respEntity),
Map.class);
+ }
+ }
+ }
+
+ private CloseableHttpClient buildHttpClient() {
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ });
+ return httpClientBuilder.build();
+ }
+
+ public boolean tryHttpConnection(String host) {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e1) {
+ log.warn("Failed to connect to address:{}", host, e1);
+ return false;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java
new file mode 100644
index 000000000..66ae222b3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+@AllArgsConstructor
+@Getter
+@Setter
+public class StarRocksFlushTuple {
+ private String label;
+ private Long bytes;
+ private List<byte[]> rows;
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
new file mode 100644
index 000000000..2f67e4fea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class StarRocksSinkManager {
+
+ private final SinkConfig sinkConfig;
+ private final List<byte[]> batchList;
+
+ private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture<?> scheduledFuture;
+ private volatile boolean initialize;
+ private volatile Exception flushException;
+ private int batchRowCount = 0;
+ private long batchBytesSize = 0;
+
+ private Integer batchIntervalMs;
+
+ public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames)
{
+ this.sinkConfig = sinkConfig;
+ this.batchList = new ArrayList<>();
+ this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
+ starrocksStreamLoadVisitor = new
StarRocksStreamLoadVisitor(sinkConfig, fileNames);
+ }
+
+ private void tryInit() throws IOException {
+ if (initialize) {
+ return;
+ }
+ initialize = true;
+
+ if (batchIntervalMs != null) {
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("StarRocks-sink-output-%s").build());
+ scheduledFuture = scheduler.scheduleAtFixedRate(
+ () -> {
+ try {
+ flush();
+ } catch (IOException e) {
+ flushException = e;
+ }
+ },
+ batchIntervalMs,
+ batchIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public synchronized void write(String record) throws IOException {
+ tryInit();
+ checkFlushException();
+ byte[] bts = record.getBytes(StandardCharsets.UTF_8);
+ batchList.add(bts);
+ batchRowCount++;
+ batchBytesSize += bts.length;
+ if (batchRowCount >= sinkConfig.getBatchMaxSize() || batchBytesSize >=
sinkConfig.getBatchMaxBytes()) {
+ flush();
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+
+ flush();
+ }
+
+ public synchronized void flush() throws IOException {
+ checkFlushException();
+ if (batchList.isEmpty()) {
+ return;
+ }
+ StarRocksFlushTuple tuple = null;
+ for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
+ try {
+ String label = createBatchLabel();
+ tuple = new StarRocksFlushTuple(label, batchBytesSize, new
ArrayList<>(batchList));
+ starrocksStreamLoadVisitor.doStreamLoad(tuple);
+ } catch (Exception e) {
+
+ log.error("Writing records to StarRocks failed, retry times =
{}", i, e);
+ if (i >= sinkConfig.getMaxRetries()) {
+ throw new IOException("Writing records to StarRocks
failed.", e);
+ }
+
+ if (e instanceof StarRocksStreamLoadFailedException &&
((StarRocksStreamLoadFailedException) e).needReCreateLabel()) {
+ String newLabel = createBatchLabel();
+ log.warn(String.format("Batch label changed from [%s] to
[%s]", tuple.getLabel(), newLabel));
+ tuple.setLabel(newLabel);
+ }
+
+ try {
+ long backoff =
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
+ sinkConfig.getMaxRetryBackoffMs());
+ Thread.sleep(backoff);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Unable to flush; interrupted while doing another
attempt.", e);
+ }
+ }
+ }
+ batchList.clear();
+ batchRowCount = 0;
+ batchBytesSize = 0;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to StarRocks failed.",
flushException);
+ }
+ }
+
+ public String createBatchLabel() {
+ StringBuilder sb = new StringBuilder();
+ if (!Strings.isNullOrEmpty(sinkConfig.getLabelPrefix())) {
+ sb.append(sinkConfig.getLabelPrefix());
+ }
+ return sb.append(UUID.randomUUID().toString())
+ .toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
new file mode 100644
index 000000000..626b38d3f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class StarRocksStreamLoadFailedException extends IOException {
+
+ static final long serialVersionUID = 1L;
+
+ private final Map<String, Object> response;
+ private boolean reCreateLabel;
+
+ public StarRocksStreamLoadFailedException(String message, Map<String,
Object> response) {
+ super(message);
+ this.response = response;
+ }
+
+ public StarRocksStreamLoadFailedException(String message, Map<String,
Object> response, boolean reCreateLabel) {
+ super(message);
+ this.response = response;
+ this.reCreateLabel = reCreateLabel;
+ }
+
+ public Map<String, Object> getFailedResponse() {
+ return response;
+ }
+
+ public boolean needReCreateLabel() {
+ return reCreateLabel;
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
new file mode 100644
index 000000000..58efb8726
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StarRocksStreamLoadVisitor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
+
+ private final HttpHelper httpHelper = new HttpHelper();
+ private static final int MAX_SLEEP_TIME = 5;
+
+ private final SinkConfig sinkConfig;
+ private long pos;
+ private static final String RESULT_FAILED = "Fail";
+ private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
+ private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
+ private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
+ private static final String RESULT_LABEL_PREPARE = "PREPARE";
+ private static final String RESULT_LABEL_ABORTED = "ABORTED";
+ private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
+
+ private List<String> fieldNames;
+
+ public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String>
fieldNames) {
+ this.sinkConfig = sinkConfig;
+ this.fieldNames = fieldNames;
+ }
+
+ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException
{
+ String host = getAvailableHost();
+ if (null == host) {
+ throw new IOException("None of the host in `load_url` could be
connected.");
+ }
+ String loadUrl = new StringBuilder(host)
+ .append("/api/")
+ .append(sinkConfig.getDatabase())
+ .append("/")
+ .append(sinkConfig.getTable())
+ .append("/_stream_load")
+ .toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Start to join batch data: rows[%d]
bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(),
flushData.getLabel()));
+ }
+ Map<String, Object> loadResult = httpHelper.doHttpPut(loadUrl,
joinRows(flushData.getRows(), flushData.getBytes().intValue()),
getStreamLoadHttpHeader(flushData.getLabel()));
+ final String keyStatus = "Status";
+ if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+ LOG.error("unknown result status. {}", loadResult);
+ throw new IOException("Unable to flush data to StarRocks: unknown
result status. " + loadResult);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(new StringBuilder("StreamLoad
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
+ }
+ if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
+ StringBuilder errorBuilder = new StringBuilder("Failed to flush
data to StarRocks.\n");
+ if (loadResult.containsKey("Message")) {
+ errorBuilder.append(loadResult.get("Message"));
+ errorBuilder.append('\n');
+ }
+ if (loadResult.containsKey("ErrorURL")) {
+ LOG.error("StreamLoad response: {}", loadResult);
+ try {
+
errorBuilder.append(httpHelper.doHttpGet(loadResult.get("ErrorURL").toString()));
+ errorBuilder.append('\n');
+ } catch (IOException e) {
+ LOG.warn("Get Error URL failed. {} ",
loadResult.get("ErrorURL"), e);
+ }
+ } else {
+ errorBuilder.append(JsonUtils.toJsonString(loadResult));
+ errorBuilder.append('\n');
+ }
+ throw new IOException(errorBuilder.toString());
+ } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
+ LOG.debug(new StringBuilder("StreamLoad
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
+ // has to block-checking the state to get the final result
+ checkLabelState(host, flushData.getLabel());
+ }
+ }
+
+ private String getAvailableHost() {
+ List<String> hostList = sinkConfig.getNodeUrls();
+ long tmp = pos + hostList.size();
+ for (; pos < tmp; pos++) {
+ String host = new
StringBuilder("http://").append(hostList.get((int) (pos %
hostList.size()))).toString();
+ if (httpHelper.tryHttpConnection(host)) {
+ return host;
+ }
+ }
+ return null;
+ }
+
+ private byte[] joinRows(List<byte[]> rows, int totalBytes) {
+ if
(SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
+ Map<String, Object> props = sinkConfig.getStreamLoadProps();
+ byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)
props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() *
lineDelimiter.length);
+ for (byte[] row : rows) {
+ bos.put(row);
+ bos.put(lineDelimiter);
+ }
+ return bos.array();
+ }
+
+ if
(SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty()
? 2 : rows.size() + 1));
+ bos.put("[".getBytes(StandardCharsets.UTF_8));
+ byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+ boolean isFirstElement = true;
+ for (byte[] row : rows) {
+ if (!isFirstElement) {
+ bos.put(jsonDelimiter);
+ }
+ bos.put(row);
+ isFirstElement = false;
+ }
+ bos.put("]".getBytes(StandardCharsets.UTF_8));
+ return bos.array();
+ }
+ throw new RuntimeException("Failed to join rows data, unsupported
`format` from stream load properties:");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkLabelState(String host, String label) throws IOException
{
+ int idx = 0;
+ while (true) {
+ try {
+ TimeUnit.SECONDS.sleep(Math.min(++idx, MAX_SLEEP_TIME));
+ } catch (InterruptedException ex) {
+ break;
+ }
+ try {
+ String queryLoadStateUrl = new
StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString();
+ Map<String, Object> result =
httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label));
+ if (result == null) {
+ throw new IOException(String.format("Failed to flush data
to StarRocks, Error " +
+ "could not get the final state of label[%s].\n",
label), null);
+ }
+ String labelState = (String) result.get("state");
+ if (null == labelState) {
+ throw new IOException(String.format("Failed to flush data
to StarRocks, Error " +
+ "could not get the final state of label[%s].
response[%s]\n", label, JsonUtils.toJsonString(result)), null);
+ }
+ LOG.info(String.format("Checking label[%s] state[%s]\n",
label, labelState));
+ switch (labelState) {
+ case LAEBL_STATE_VISIBLE:
+ case LAEBL_STATE_COMMITTED:
+ return;
+ case RESULT_LABEL_PREPARE:
+ continue;
+ case RESULT_LABEL_ABORTED:
+ throw new
StarRocksStreamLoadFailedException(String.format("Failed to flush data to
StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState),
null, true);
+ case RESULT_LABEL_UNKNOWN:
+ default:
+ throw new
StarRocksStreamLoadFailedException(String.format("Failed to flush data to
StarRocks, Error " +
+ "label[%s] state[%s]\n", label, labelState),
null);
+ }
+ } catch (IOException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private String getBasicAuthHeader(String username, String password) {
+ String auth = username + ":" + password;
+ byte[] encodedAuth =
Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+ return new StringBuilder("Basic ").append(new
String(encodedAuth)).toString();
+ }
+
+ private Map<String, String> getStreamLoadHttpHeader(String label) {
+ Map<String, String> headerMap = new HashMap<>();
+ if (null != fieldNames && !fieldNames.isEmpty() &&
SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
+ headerMap.put("columns", String.join(",",
fieldNames.stream().map(f -> String.format("`%s`",
f)).collect(Collectors.toList())));
+ }
+ if (null != sinkConfig.getStreamLoadProps()) {
+ for (Map.Entry<String, Object> entry :
sinkConfig.getStreamLoadProps().entrySet()) {
+ headerMap.put(entry.getKey(),
String.valueOf(entry.getValue()));
+ }
+ }
+ headerMap.put("strip_outer_array", "true");
+ headerMap.put("Expect", "100-continue");
+ headerMap.put("label", label);
+ headerMap.put("Content-Type", "application/x-www-form-urlencoded");
+ headerMap.put("Authorization",
getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
+ return headerMap;
+ }
+
+ private Map<String, String> getLoadStateHttpHeader(String label) {
+ Map<String, String> headerMap = new HashMap<>();
+ headerMap.put("Authorization",
getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
+ headerMap.put("Connection", "close");
+ return headerMap;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
new file mode 100644
index 000000000..0c04c372d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Setter
+@Getter
+@ToString
+public class SinkConfig {
+
+ public static final String NODE_URLS = "nodeUrls";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String LABEL_PREFIX = "labelPrefix";
+ public static final String DATABASE = "database";
+ public static final String TABLE = "table";
+ public static final String STARROCKS_SINK_CONFIG_PREFIX =
"sink.properties.";
+ private static final String LOAD_FORMAT = "format";
+ private static final StreamLoadFormat DEFAULT_LOAD_FORMAT =
StreamLoadFormat.CSV;
+ private static final String COLUMN_SEPARATOR = "column_separator";
+ public static final String BATCH_MAX_SIZE = "batch_max_rows";
+ public static final String BATCH_MAX_BYTES = "batch_max_bytes";
+ public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+ public static final String MAX_RETRIES = "max_retries";
+ public static final String RETRY_BACKOFF_MULTIPLIER_MS =
"retry_backoff_multiplier_ms";
+ public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
+
+ public enum StreamLoadFormat {
+ CSV, JSON;
+ public static StreamLoadFormat parse(String format) {
+ if (StreamLoadFormat.JSON.name().equals(format)) {
+ return JSON;
+ }
+ return CSV;
+ }
+ }
+
+ private List<String> nodeUrls;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+ private String labelPrefix;
+ private String columnSeparator;
+ private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT;
+ private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
+ private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
+
+ private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE;
+ private long batchMaxBytes = DEFAULT_BATCH_BYTES;
+
+ private Integer batchIntervalMs;
+ private int maxRetries;
+ private int retryBackoffMultiplierMs;
+ private int maxRetryBackoffMs;
+
+ private final Map<String, Object> streamLoadProps = new HashMap<>();
+
+ public static SinkConfig loadConfig(Config pluginConfig) {
+ SinkConfig sinkConfig = new SinkConfig();
+ sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS));
+ sinkConfig.setDatabase(pluginConfig.getString(DATABASE));
+ sinkConfig.setTable(pluginConfig.getString(TABLE));
+
+ if (pluginConfig.hasPath(USERNAME)) {
+ sinkConfig.setUsername(pluginConfig.getString(USERNAME));
+ }
+ if (pluginConfig.hasPath(PASSWORD)) {
+ sinkConfig.setPassword(pluginConfig.getString(PASSWORD));
+ }
+ if (pluginConfig.hasPath(LABEL_PREFIX)) {
+ sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX));
+ }
+ if (pluginConfig.hasPath(COLUMN_SEPARATOR)) {
+
sinkConfig.setColumnSeparator(pluginConfig.getString(COLUMN_SEPARATOR));
+ }
+ if (pluginConfig.hasPath(BATCH_MAX_SIZE)) {
+ sinkConfig.setBatchMaxSize(pluginConfig.getInt(BATCH_MAX_SIZE));
+ }
+ if (pluginConfig.hasPath(BATCH_MAX_BYTES)) {
+ sinkConfig.setBatchMaxBytes(pluginConfig.getLong(BATCH_MAX_BYTES));
+ }
+ if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) {
+
sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS));
+ }
+ if (pluginConfig.hasPath(MAX_RETRIES)) {
+ sinkConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES));
+ }
+ if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
+
sinkConfig.setRetryBackoffMultiplierMs(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+ }
+ if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) {
+
sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS));
+ }
+ parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
+ if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
+ sinkConfig.setColumnSeparator((String)
sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
+ }
+ if (sinkConfig.streamLoadProps.containsKey(LOAD_FORMAT)) {
+ sinkConfig.setLoadFormat(StreamLoadFormat.parse((String)
sinkConfig.streamLoadProps.get(LOAD_FORMAT)));
+ }
+ return sinkConfig;
+ }
+
+ private static void parseSinkStreamLoadProperties(Config pluginConfig,
SinkConfig sinkConfig) {
+ Config starRocksConfig =
TypesafeConfigUtils.extractSubConfig(pluginConfig,
+ STARROCKS_SINK_CONFIG_PREFIX, false);
+ starRocksConfig.entrySet().forEach(entry -> {
+ final String configKey = entry.getKey().toLowerCase();
+ sinkConfig.streamLoadProps.put(configKey,
entry.getValue().unwrapped());
+ });
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
new file mode 100644
index 000000000..190b18b95
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+
+import lombok.Builder;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+public class StarRocksBaseSerializer {
+ @Builder.Default
+ private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;
+ @Builder.Default
+ private DateTimeUtils.Formatter dateTimeFormatter =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ @Builder.Default
+ private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;
+
+ protected String convert(SeaTunnelDataType dataType, Object val) {
+ if (val == null) {
+ return null;
+ }
+ switch (dataType.getSqlType()) {
+ case TINYINT:
+ case SMALLINT:
+ return String.valueOf(((Number) val).shortValue());
+ case INT:
+ return String.valueOf(((Number) val).intValue());
+ case BIGINT:
+ return String.valueOf(((Number) val).longValue());
+ case FLOAT:
+ return String.valueOf(((Number) val).floatValue());
+ case DOUBLE:
+ return String.valueOf(((Number) val).doubleValue());
+ case DECIMAL:
+ case BOOLEAN:
+ return val.toString();
+ case DATE:
+ return DateUtils.toString((LocalDate) val, dateFormatter);
+ case TIME:
+ return TimeUtils.toString((LocalTime) val, timeFormatter);
+ case TIMESTAMP:
+ return DateTimeUtils.toString((LocalDateTime) val,
dateTimeFormatter);
+ case STRING:
+ return (String) val;
+ case ARRAY:
+ case MAP:
+ return JsonUtils.toJsonString(val);
+ case BYTES:
+ return new String((byte[]) val);
+ default:
+ throw new UnsupportedOperationException("Unsupported dataType:
" + dataType);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java
new file mode 100644
index 000000000..191b615ba
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements
StarRocksISerializer {
+ private static final long serialVersionUID = 1L;
+
+ private final String columnSeparator;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public StarRocksCsvSerializer(String sp, SeaTunnelRowType
seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t");
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < row.getFields().length; i++) {
+ String value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ sb.append(null == value ? "\\N" : value);
+ if (i < row.getFields().length - 1) {
+ sb.append(columnSeparator);
+ }
+ }
+ return sb.toString();
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
new file mode 100644
index 000000000..1b7ea726f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import com.google.common.base.Strings;
+
+import java.io.StringWriter;
+
+public class StarRocksDelimiterParser {
+ private static final int SHIFT = 4;
+
+ private static final String HEX_STRING = "0123456789ABCDEF";
+
+ public static String parse(String sp, String dSp) throws RuntimeException {
+ if (Strings.isNullOrEmpty(sp)) {
+ return dSp;
+ }
+ if (!sp.toUpperCase().startsWith("\\X")) {
+ return sp;
+ }
+ String hexStr = sp.substring(2);
+ // check hex str
+ if (hexStr.isEmpty()) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str is
empty`");
+ }
+ if (hexStr.length() % 2 != 0) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str
length error`");
+ }
+ for (char hexChar : hexStr.toUpperCase().toCharArray()) {
+ if (HEX_STRING.indexOf(hexChar) == -1) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex
str format error`");
+ }
+ }
+ // transform to separator
+ StringWriter writer = new StringWriter();
+ for (byte b : hexStrToBytes(hexStr)) {
+ writer.append((char) b);
+ }
+ return writer.toString();
+ }
+
+ private static byte[] hexStrToBytes(String hexStr) {
+ String upperHexStr = hexStr.toUpperCase();
+ int length = upperHexStr.length() / 2;
+ char[] hexChars = upperHexStr.toCharArray();
+ byte[] bytes = new byte[length];
+ for (int i = 0; i < length; i++) {
+ int pos = i * 2;
+ bytes[i] = (byte) (charToByte(hexChars[pos]) << SHIFT |
charToByte(hexChars[pos + 1]));
+ }
+ return bytes;
+ }
+
+ private static byte charToByte(char c) {
+ return (byte) HEX_STRING.indexOf(c);
+ }
+}
+
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java
new file mode 100644
index 000000000..bca39d20b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksISerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.Serializable;
+
+public interface StarRocksISerializer extends Serializable {
+
+ String serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
new file mode 100644
index 000000000..5e5edaa27
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StarRocksJsonSerializer extends StarRocksBaseSerializer
implements StarRocksISerializer {
+
+ private static final long serialVersionUID = 1L;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+
+ for (int i = 0; i < row.getFields().length; i++) {
+ String value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ rowMap.put(seaTunnelRowType.getFieldName(i), value);
+ }
+ return JsonUtils.toJsonString(rowMap);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
new file mode 100644
index 000000000..cf9c1093f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.NODE_URLS;
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public String getPluginName() {
+ return "StarRocks";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS, DATABASE, TABLE, USERNAME, PASSWORD);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ return new StarRocksSinkWriter(pluginConfig, seaTunnelRowType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
new file mode 100644
index 000000000..441af0296
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow,
Void> {
+
+ private final StarRocksISerializer serializer;
+ private final StarRocksSinkManager manager;
+
+ public StarRocksSinkWriter(Config pluginConfig,
+ SeaTunnelRowType seaTunnelRowType) {
+ SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
+ List<String> fieldNames =
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
+ this.serializer = createSerializer(sinkConfig, seaTunnelRowType);
+ this.manager = new StarRocksSinkManager(sinkConfig, fieldNames);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ String record = serializer.serialize(element);
+ manager.write(record);
+ }
+
+ @SneakyThrows
+ @Override
+ public Optional<Void> prepareCommit() {
+ // Flush to storage before snapshot state is performed
+ manager.flush();
+ return super.prepareCommit();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (manager != null) {
+ manager.close();
+ }
+ } catch (IOException e) {
+ log.error("Close starRocks manager failed.", e);
+ throw new IOException("Close starRocks manager failed.", e);
+ }
+ }
+
+ public static StarRocksISerializer createSerializer(SinkConfig sinkConfig,
SeaTunnelRowType seaTunnelRowType) {
+ if
(SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
+ return new StarRocksCsvSerializer(sinkConfig.getColumnSeparator(),
seaTunnelRowType);
+ }
+ if
(SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
+ return new StarRocksJsonSerializer(seaTunnelRowType);
+ }
+ throw new RuntimeException("Failed to create row serializer,
unsupported `format` from stream load properties.");
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 8ca99e023..d2fd18c6a 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -57,6 +57,7 @@
<module>connector-iceberg</module>
<module>connector-influxdb</module>
<module>connector-amazondynamodb</module>
+ <module>connector-starrocks</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 8994f378c..d29e0d706 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -303,6 +303,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-starrocks</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
similarity index 65%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
index ced7386dd..684b7e336 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
@@ -13,48 +13,36 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
- <artifactId>seatunnel-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <modules>
- <module>connector-assert-e2e</module>
- <module>connector-jdbc-e2e</module>
- <module>connector-redis-e2e</module>
- <module>connector-clickhouse-e2e</module>
- <module>connector-influxdb-e2e</module>
- <module>connector-amazondynamodb-e2e</module>
- <module>connector-file-local-e2e</module>
- </modules>
- <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <artifactId>connector-starrocks-e2e</artifactId>
<dependencies>
+ <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e-common</artifactId>
+ <artifactId>connector-starrocks</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-starter</artifactId>
+ <artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-starter</artifactId>
+ <artifactId>connector-jdbc-e2e</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
-
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
new file mode 100644
index 000000000..e25509927
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.starrocks;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class StarRocksIT extends TestSuiteBase implements TestResource {
+ private static final String DOCKER_IMAGE =
"d87904488/starrocks-starter:2.2.1";
+ private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ private static final String HOST = "e2e_starRocksdb";
+ private static final int SR_DOCKER_PORT = 9030;
+ private static final int SR_PORT = 9033;
+
+ private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private static final String DATABASE = "test";
+ private static final String SOURCE_TABLE = "e2e_table_source";
+ private static final String SINK_TABLE = "e2e_table_sink";
+ private static final String SR_DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL,
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL,
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
+
+ private static final String DDL_SOURCE = "create table " + DATABASE + "."
+ SOURCE_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\",\n" +
+ "\"in_memory\" = \"false\"," +
+ "\"in_memory\" = \"false\"," +
+ "\"storage_format\" = \"DEFAULT\"" +
+ ")";
+
+ private static final String DDL_SINK = "create table " + DATABASE + "." +
SINK_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\",\n" +
+ "\"in_memory\" = \"false\"," +
+ "\"in_memory\" = \"false\"," +
+ "\"storage_format\" = \"DEFAULT\"" +
+ ")";
+
+ private static final String INIT_DATA_SQL = "insert into " + DATABASE +
"." + SOURCE_TABLE + " (\n" +
+ " BIGINT_COL,\n" +
+ " LARGEINT_COL,\n" +
+ " SMALLINT_COL,\n" +
+ " TINYINT_COL,\n" +
+ " BOOLEAN_COL,\n" +
+ " DECIMAL_COL,\n" +
+ " DOUBLE_COL,\n" +
+ " FLOAT_COL,\n" +
+ " INT_COL,\n" +
+ " CHAR_COL,\n" +
+ " VARCHAR_11_COL,\n" +
+ " STRING_COL,\n" +
+ " DATETIME_COL,\n" +
+ " DATE_COL\n" +
+ ")values(\n" +
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+ ")";
+
+ private Connection jdbcConnection;
+ private GenericContainer<?> starRocksServer;
+ private static final List<SeaTunnelRow> TEST_DATASET =
generateTestDataSet();
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + SR_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ starRocksServer = new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ starRocksServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", SR_PORT, SR_DOCKER_PORT)));
+ Startables.deepStart(Stream.of(starRocksServer)).join();
+ log.info("StarRocks container started");
+ // wait for starrocks fully start
+ given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private static List<SeaTunnelRow> generateTestDataSet() {
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Long.valueOf(1123456),
+ Short.parseShort("1"),
+ Byte.parseByte("1"),
+ Boolean.FALSE,
+ BigDecimal.valueOf(2222243, 1),
+ Double.parseDouble("2222243.2222243"),
+ Float.parseFloat("222224"),
+ Integer.parseInt("1"),
+ "a",
+ "VARCHAR_COL",
+ "STRING_COL",
+ "2022-08-13 17:35:59",
+ "2022-08-13"
+ });
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (jdbcConnection != null) {
+ jdbcConnection.close();
+ }
+ if (starRocksServer != null) {
+ starRocksServer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testStarRocksSink(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/starrocks-jdbc-to-starrocks.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ try {
+ assertHasData(SINK_TABLE);
+
+ String sourceSql = String.format("select * from %s.%s", DATABASE,
SOURCE_TABLE);
+ String sinkSql = String.format("select * from %s.%s", DATABASE,
SINK_TABLE);
+ List<String> columnList =
Arrays.stream(COLUMN_STRING.split(",")).map(x ->
x.trim()).collect(Collectors.toList());
+ Statement sourceStatement = jdbcConnection.createStatement();
+ Statement sinkStatement = jdbcConnection.createStatement();
+ ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+
Assertions.assertEquals(sourceResultSet.getMetaData().getColumnCount(),
sinkResultSet.getMetaData().getColumnCount());
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columnList) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
+ String sourceValue =
IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue =
IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("get starRocks connection error", e);
+ }
+ }
+
+ private void initializeJdbcConnection() throws SQLException,
ClassNotFoundException, MalformedURLException, InstantiationException,
IllegalAccessException {
+ URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new
URL(SR_DRIVER_JAR)}, StarRocksIT.class.getClassLoader());
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ Driver driver = (Driver)
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+ Properties props = new Properties();
+ props.put("user", USERNAME);
+ props.put("password", PASSWORD);
+ jdbcConnection = driver.connect(String.format(URL,
starRocksServer.getHost()), props);
+ }
+
+ private void initializeJdbcTable() {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ // create databases
+ statement.execute("create database test");
+ // create source table
+ statement.execute(DDL_SOURCE);
+ // create sink table
+ statement.execute(DDL_SINK);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing table failed!", e);
+ }
+ }
+
+ private void batchInsertData() {
+ List<SeaTunnelRow> rows = TEST_DATASET;
+ try {
+ jdbcConnection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
jdbcConnection.prepareStatement(INIT_DATA_SQL)) {
+ for (int i = 0; i < rows.size(); i++) {
+ for (int index = 0; index <
rows.get(i).getFields().length; index++) {
+ preparedStatement.setObject(index + 1,
rows.get(i).getFields()[index]);
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ jdbcConnection.commit();
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new RuntimeException("get connection error", exception);
+ }
+ }
+
+ private void assertHasData(String table) {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
+ ResultSet source = statement.executeQuery(sql);
+ Assertions.assertTrue(source.next());
+ } catch (Exception e) {
+ throw new RuntimeException("test starrocks server image error", e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf
new file mode 100644
index 000000000..3f8dfa33a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://e2e_starRocksdb:9030"
+ user = root
+ password = ""
+ query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL,
BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL,
VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from
`test`.`e2e_table_source`"
+ }
+}
+
+transform {
+}
+
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "e2e_table_sink"
+ batch_max_rows = 10
+ sink.properties.format = "JSON"
+ sink.properties.strip_outer_array = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index ced7386dd..d5cdf69aa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -28,6 +28,7 @@
<module>connector-jdbc-e2e</module>
<module>connector-redis-e2e</module>
<module>connector-clickhouse-e2e</module>
+ <module>connector-starrocks-e2e</module>
<module>connector-influxdb-e2e</module>
<module>connector-amazondynamodb-e2e</module>
<module>connector-file-local-e2e</module>