This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 66b0f1e1d2 [Feature][Connector] add starrocks save_mode (#6029)
66b0f1e1d2 is described below
commit 66b0f1e1d294e60a04e68e66064b6fc945678d9f
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 12 14:37:52 2024 +0800
[Feature][Connector] add starrocks save_mode (#6029)
---
docs/en/connector-v2/sink/StarRocks.md | 95 ++++++++++++---
.../starrocks/catalog/StarRocksCatalog.java | 65 +++++++++-
.../starrocks/catalog/StarRocksCatalogFactory.java | 4 +-
.../seatunnel/starrocks/config/SinkConfig.java | 7 +-
.../starrocks/config/StarRocksSinkOptions.java | 24 ++--
.../seatunnel/starrocks/sink/StarRocksSink.java | 67 +++++------
.../starrocks/sink/StarRocksSinkFactory.java | 57 ++++++++-
.../starrocks/catalog/StarRocksCatalogTest.java | 2 +-
.../connector-starrocks-e2e/pom.xml | 9 ++
.../e2e/connector/starrocks/StarRocksIT.java | 131 ++++++++++++---------
.../starrocks-thrift-to-starrocks-streamload.conf | 6 +-
11 files changed, 341 insertions(+), 126 deletions(-)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 940d806ef3..03afca211b 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -20,24 +20,27 @@ The internal implementation of StarRocks sink connector is
cached and imported b
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| nodeUrls | list | yes | - |
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
|
-| base-url | string | yes | - | The
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030`
or `jdbc:mysql://localhost:9030/db`
|
-| username | string | yes | - |
`StarRocks` user username
|
-| password | string | yes | - |
`StarRocks` user password
|
-| database | string | yes | - | The
name of StarRocks database
|
-| table | string | no | - | The
name of StarRocks table, If not set, the table name will be the name of the
upstream table
|
-| labelPrefix | string | no | - | The
prefix of StarRocks stream load label
|
-| batch_max_rows | long | no | 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
-| batch_max_bytes | int | no | 5 * 1024 * 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
-| max_retries | int | no | - | The
number of retries to flush failed
|
-| retry_backoff_multiplier_ms | int | no | - | Using
as a multiplier for generating the next delay for backoff
|
-| max_retry_backoff_ms | int | no | - | The
amount of time to wait before attempting to retry a request to `StarRocks`
|
-| enable_upsert_delete | boolean | no | false | Whether
to enable upsert/delete, only supports PrimaryKey model.
|
-| save_mode_create_template | string | no | see below | see
below
|
-| starrocks.config | map | no | - | The
parameter of the stream load `data_desc`
|
-| http_socket_timeout_ms | int | no | 180000 | Set
http socket timeout, default is 3 minutes.
|
+| Name | Type | Required | Default
|
Description
|
+|-----------------------------|---------|----------|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| nodeUrls | list | yes | -
| `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
|
+| base-url | string | yes | -
| The JDBC URL like `jdbc:mysql://localhost:9030/` or
`jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db`
|
+| username | string | yes | -
| `StarRocks` user username
|
+| password | string | yes | -
| `StarRocks` user password
|
+| database | string | yes | -
| The name of StarRocks database
|
+| table | string | no | -
| The name of StarRocks table, If not set, the table name will be the name
of the upstream table
|
+| labelPrefix | string | no | -
| The prefix of StarRocks stream load label
|
+| batch_max_rows | long | no | 1024
| For batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| batch_max_bytes | int | no | 5 * 1024 * 1024
| For batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| max_retries | int | no | -
| The number of retries to flush failed
|
+| retry_backoff_multiplier_ms | int | no | -
| Using as a multiplier for generating the next delay for backoff
|
+| max_retry_backoff_ms | int | no | -
| The amount of time to wait before attempting to retry a request to
`StarRocks`
|
+| enable_upsert_delete | boolean | no | false
| Whether to enable upsert/delete, only supports PrimaryKey model.
|
+| save_mode_create_template | string | no | see below
| see below
|
+| starrocks.config | map | no | -
| The parameter of the stream load `data_desc`
|
+| http_socket_timeout_ms | int | no | 180000
| Set http socket timeout, default is 3 minutes.
|
+| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on,
different treatment schemes are selected for the existing surface structure of
the target side.
|
+| data_save_mode | Enum | no | APPEND_DATA
| Before the synchronous task is turned on, different processing schemes are
selected for data existing data on the target side.
|
+| custom_sql | String | no | -
| When data_save_mode selects CUSTOM_PROCESSING, you should fill in the
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be
executed. SQL will be executed before synchronization tasks. |
### save_mode_create_template
@@ -45,6 +48,40 @@ We use templates to automatically create starrocks tables,
which will create corresponding table creation statements based on the type of
upstream data and schema type,
and the default template can be modified according to the situation. Only work
on multi-table mode at now.
+### table [string]
+
+Use `database` and this `table-name` auto-generate sql and receive upstream
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
+The table parameter can fill in the name of an unwilling table, which will
eventually be used as the table name of the creation table, and supports
variables (`${table_name}`, `${schema_name}`). Replacement rules:
`${schema_name}` will replace the SCHEMA name passed to the target side, and
`${table_name}` will replace the name of the table passed to the table at the
target side.
+
+for example:
+1. test_${schema_name}_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are
selected for the existing surface structure of the target side.
+Option introduction:
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and
rebuild when the table is saved
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist,
skipped when the table is saved
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are
selected for data existing data on the target side.
+Option introduction:
+`DROP_DATA`: Preserve database structure and delete data
+`APPEND_DATA`:Preserve database structure, preserve data
+`CUSTOM_PROCESSING`:User defined processing
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### custom_sql[String]
+
+When data_save_mode selects CUSTOM_PROCESSING, you should fill in the
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be
executed. SQL will be executed before synchronization tasks.
+
```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
@@ -222,6 +259,28 @@ sink {
}
```
+### Use save_mode function
+
+```
+sink {
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "test_${schema_name}_${table_name}"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ batch_max_rows = 10
+ starrocks.config = {
+ format = "CSV"
+ column_separator = "\\x01"
+ row_delimiter = "\\x02"
+ }
+ }
+}
+```
+
## Changelog
### next version
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 0c37322199..3dc7eebfa6 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
import org.apache.commons.lang3.StringUtils;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysql.cj.MysqlType;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -63,6 +65,7 @@ import java.util.stream.IntStream;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+@Slf4j
public class StarRocksCatalog implements Catalog {
protected final String catalogName;
@@ -72,6 +75,7 @@ public class StarRocksCatalog implements Catalog {
protected final String baseUrl;
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
+ private final String template;
private static final Set<String> SYS_DATABASES = new HashSet<>();
private static final Logger LOG =
LoggerFactory.getLogger(StarRocksCatalog.class);
@@ -81,10 +85,10 @@ public class StarRocksCatalog implements Catalog {
SYS_DATABASES.add("_statistics_");
}
- public StarRocksCatalog(String catalogName, String username, String pwd,
String defaultUrl) {
+ public StarRocksCatalog(
+ String catalogName, String username, String pwd, String
defaultUrl, String template) {
checkArgument(StringUtils.isNotBlank(username));
- checkArgument(StringUtils.isNotBlank(pwd));
checkArgument(StringUtils.isNotBlank(defaultUrl));
urlInfo = JdbcUrlUtil.getUrlInfo(defaultUrl);
this.baseUrl = urlInfo.getUrlWithoutDatabase();
@@ -95,6 +99,7 @@ public class StarRocksCatalog implements Catalog {
this.catalogName = catalogName;
this.username = username;
this.pwd = pwd;
+ this.template = template;
}
@Override
@@ -208,13 +213,64 @@ public class StarRocksCatalog implements Catalog {
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
- throw new UnsupportedOperationException();
+ this.createTable(
+ StarRocksSaveModeUtil.fillingCreateSql(
+ template,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ table.getTableSchema()));
}
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ if (ignoreIfNotExists) {
+ conn.createStatement().execute("DROP TABLE IF EXISTS " +
tablePath.getFullName());
+ } else {
+ conn.createStatement()
+ .execute(String.format("DROP TABLE %s",
tablePath.getFullName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ if (ignoreIfNotExists) {
+ conn.createStatement()
+ .execute(String.format("TRUNCATE TABLE %s",
tablePath.getFullName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed TRUNCATE TABLE in catalog %s",
tablePath.getFullName()),
+ e);
+ }
+ }
+
+ public void executeSql(TablePath tablePath, String sql) {
+ try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ connection.createStatement().execute(sql);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed EXECUTE SQL in
catalog %s", sql), e);
+ }
+ }
+
+ public boolean isExistsData(TablePath tablePath) {
+ try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ String sql = String.format("select * from %s limit 1",
tablePath.getFullName());
+ ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ if (resultSet == null) {
+ return false;
+ }
+ return resultSet.next();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed Connection JDBC error %s",
tablePath.getTableName()), e);
+ }
}
@Override
@@ -336,6 +392,7 @@ public class StarRocksCatalog implements Catalog {
public void createTable(String sql)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ log.info("create table sql is :{}", sql);
conn.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 8204e0b539..94a93b3f56 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import com.google.auto.service.AutoService;
@@ -36,7 +37,8 @@ public class StarRocksCatalogFactory implements
CatalogFactory {
catalogName,
options.get(StarRocksOptions.USERNAME),
options.get(StarRocksOptions.PASSWORD),
- options.get(StarRocksOptions.BASE_URL));
+ options.get(StarRocksOptions.BASE_URL),
+ options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 6862a87bc5..88b608dab3 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import lombok.Getter;
import lombok.Setter;
@@ -58,7 +59,9 @@ public class SinkConfig implements Serializable {
private String saveModeCreateTemplate;
+ private SchemaSaveMode schemaSaveMode;
private DataSaveMode dataSaveMode;
+ private String customSql;
private int httpSocketTimeout;
@@ -91,7 +94,9 @@ public class SinkConfig implements Serializable {
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
- sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
+
sinkConfig.setSchemaSaveMode(config.get(StarRocksSinkOptions.SCHEMA_SAVE_MODE));
+
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.DATA_SAVE_MODE));
+ sinkConfig.setCustomSql(config.get(StarRocksSinkOptions.CUSTOM_SQL));
sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
return sinkConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 6500d1474d..c6cd605063 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -19,12 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -131,17 +129,29 @@ public interface StarRocksSinkOptions {
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");
+ Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription(
+ "different treatment schemes are selected for the
existing surface structure of the target side");
- SingleChoiceOption<DataSaveMode> SAVE_MODE =
- Options.key(SupportSaveMode.DATA_SAVE_MODE_KEY)
- .singleChoice(DataSaveMode.class,
Arrays.asList(DataSaveMode.APPEND_DATA))
+ Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription(
- "Table structure and data processing methods that
already exist on the target end");
+ "different processing schemes are selected for
data existing data on the target side");
Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
Options.key("http_socket_timeout_ms")
.intType()
.defaultValue(3 * 60 * 1000)
.withDescription("Set http socket timeout, default is 3
minutes.");
+
+ Option<String> CUSTOM_SQL =
+ Options.key("custom_sql")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("when schema_save_mode selects
CUSTOM_PROCESSING custom SQL");
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ee613dd972..b9040f72d4 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,10 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -38,15 +42,17 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
private SeaTunnelRowType seaTunnelRowType;
private final SinkConfig sinkConfig;
- private final DataSaveMode dataSaveMode;
-
+ private DataSaveMode dataSaveMode;
+ private SchemaSaveMode schemaSaveMode;
private final CatalogTable catalogTable;
- public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
+ public StarRocksSink(
+ SinkConfig sinkConfig, CatalogTable catalogTable, ReadonlyConfig
readonlyConfig) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
this.catalogTable = catalogTable;
this.dataSaveMode = sinkConfig.getDataSaveMode();
+ this.schemaSaveMode = sinkConfig.getSchemaSaveMode();
}
@Override
@@ -54,27 +60,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
return StarRocksCatalogFactory.IDENTIFIER;
}
- private void autoCreateTable(String template) {
- StarRocksCatalog starRocksCatalog =
- new StarRocksCatalog(
- "StarRocks",
- sinkConfig.getUsername(),
- sinkConfig.getPassword(),
- sinkConfig.getJdbcUrl());
- if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
-
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""),
true);
- }
- if (!starRocksCatalog.tableExists(
- TablePath.of(sinkConfig.getDatabase(),
sinkConfig.getTable()))) {
- starRocksCatalog.createTable(
- StarRocksSaveModeUtil.fillingCreateSql(
- template,
- sinkConfig.getDatabase(),
- sinkConfig.getTable(),
- catalogTable.getTableSchema()));
- }
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
@@ -82,18 +67,26 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
- return Optional.empty();
- }
-
- /*@Override
- public DataSaveMode getUserConfigSaveMode() {
- return dataSaveMode;
+ TablePath tablePath =
+ TablePath.of(
+ catalogTable.getTableId().getDatabaseName(),
+ catalogTable.getTableId().getSchemaName(),
+ catalogTable.getTableId().getTableName());
+ Catalog catalog =
+ new StarRocksCatalog(
+ "StarRocks",
+ sinkConfig.getUsername(),
+ sinkConfig.getPassword(),
+ sinkConfig.getJdbcUrl(),
+ sinkConfig.getSaveModeCreateTemplate());
+ catalog.open();
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode,
+ dataSaveMode,
+ catalog,
+ tablePath,
+ catalogTable,
+ sinkConfig.getCustomSql()));
}
-
- @Override
- public void handleSaveMode(DataSaveMode saveMode) {
- if (catalogTable != null) {
- autoCreateTable(sinkConfig.getSaveModeCreateTemplate());
- }
- }*/
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 9c0a8b42d1..081645270f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -31,6 +33,11 @@ import org.apache.commons.lang3.StringUtils;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;
+
@AutoService(Factory.class)
public class StarRocksSinkFactory implements TableSinkFactory {
@Override
@@ -54,9 +61,14 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
- StarRocksSinkOptions.SAVE_MODE,
+ StarRocksSinkOptions.SCHEMA_SAVE_MODE,
+ StarRocksSinkOptions.DATA_SAVE_MODE,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
+ .conditional(
+ DATA_SAVE_MODE,
+ DataSaveMode.CUSTOM_PROCESSING,
+ StarRocksSinkOptions.CUSTOM_SQL)
.build();
}
@@ -67,6 +79,47 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
if (StringUtils.isBlank(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
- return () -> new StarRocksSink(sinkConfig, catalogTable);
+ // get source table relevant information
+ TableIdentifier tableId = catalogTable.getTableId();
+ String sourceDatabaseName = tableId.getDatabaseName();
+ String sourceSchemaName = tableId.getSchemaName();
+ String sourceTableName = tableId.getTableName();
+ // get sink table relevant information
+ String sinkDatabaseName = sinkConfig.getDatabase();
+ String sinkTableName = sinkConfig.getTable();
+ // to replace
+ String finalDatabaseName =
+ sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName);
+ String finalTableName = this.replaceFullTableName(sinkTableName,
tableId);
+ // rebuild TableIdentifier and catalogTable
+ TableIdentifier newTableId =
+ TableIdentifier.of(
+ tableId.getCatalogName(), finalDatabaseName, null,
finalTableName);
+ catalogTable =
+ CatalogTable.of(
+ newTableId,
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getCatalogName());
+
+ CatalogTable finalCatalogTable = catalogTable;
+ // reset
+ sinkConfig.setTable(finalTableName);
+ sinkConfig.setDatabase(finalDatabaseName);
+ return () -> new StarRocksSink(sinkConfig, finalCatalogTable,
context.getOptions());
+ }
+
+ private String replaceFullTableName(String original, TableIdentifier
tableId) {
+ if (StringUtils.isNotBlank(tableId.getDatabaseName())) {
+ original = original.replace(REPLACE_DATABASE_NAME_KEY,
tableId.getDatabaseName());
+ }
+ if (StringUtils.isNotBlank(tableId.getSchemaName())) {
+ original = original.replace(REPLACE_SCHEMA_NAME_KEY,
tableId.getSchemaName());
+ }
+ if (StringUtils.isNotBlank(tableId.getTableName())) {
+ original = original.replace(REPLACE_TABLE_NAME_KEY,
tableId.getTableName());
+ }
+ return original;
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
index d692d85999..bdc7a15244 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java
@@ -36,7 +36,7 @@ public class StarRocksCatalogTest {
public void testCatalog() {
StarRocksCatalog catalog =
new StarRocksCatalog(
- "starrocks", "root", "123456",
"jdbc:mysql://47.108.65.163:9030/");
+ "starrocks", "root", "123456",
"jdbc:mysql://47.108.65.163:9030/", "");
List<String> databases = catalog.listDatabases();
LOGGER.info("find databases: " + databases);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
index e76e5a7748..38e4f7eabe 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml
@@ -25,6 +25,9 @@
<artifactId>connector-starrocks-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : StarRocks</name>
+ <properties>
+ <mysql.version>8.0.27</mysql.version>
+ </properties>
<dependencies>
<!-- SeaTunnel connectors -->
<dependency>
@@ -39,5 +42,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index a0630d760a..1bd694b102 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.e2e.connector.starrocks;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -28,24 +31,20 @@ import
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.io.InputStream;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
@@ -64,18 +63,16 @@ import java.util.stream.Stream;
import static org.awaitility.Awaitility.given;
@Slf4j
-@Disabled("There are still errors unfixed @Hisoka-X")
public class StarRocksIT extends TestSuiteBase implements TestResource {
private static final String DOCKER_IMAGE =
"d87904488/starrocks-starter:2.2.1";
private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
private static final String HOST = "starrocks_e2e";
private static final int SR_DOCKER_PORT = 9030;
private static final int SR_PORT = 9033;
-
- private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
private static final String USERNAME = "root";
private static final String PASSWORD = "";
private static final String DATABASE = "test";
+ private static final String URL = "jdbc:mysql://%s:" + SR_PORT;
private static final String SOURCE_TABLE = "e2e_table_source";
private static final String SINK_TABLE = "e2e_table_sink";
private static final String SR_DRIVER_JAR =
@@ -94,36 +91,7 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
+ " SMALLINT_COL SMALLINT,\n"
+ " TINYINT_COL TINYINT,\n"
+ " BOOLEAN_COL BOOLEAN,\n"
- + " DECIMAL_COL DECIMAL,\n"
- + " DOUBLE_COL DOUBLE,\n"
- + " FLOAT_COL FLOAT,\n"
- + " INT_COL INT,\n"
- + " CHAR_COL CHAR,\n"
- + " VARCHAR_11_COL VARCHAR(11),\n"
- + " STRING_COL STRING,\n"
- + " DATETIME_COL DATETIME,\n"
- + " DATE_COL DATE\n"
- + ")ENGINE=OLAP\n"
- + "DUPLICATE KEY(`BIGINT_COL`)\n"
- + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\",\n"
- + "\"in_memory\" = \"false\","
- + "\"storage_format\" = \"DEFAULT\""
- + ")";
-
- private static final String DDL_SINK =
- "create table "
- + DATABASE
- + "."
- + SINK_TABLE
- + " (\n"
- + " BIGINT_COL BIGINT,\n"
- + " LARGEINT_COL LARGEINT,\n"
- + " SMALLINT_COL SMALLINT,\n"
- + " TINYINT_COL TINYINT,\n"
- + " BOOLEAN_COL BOOLEAN,\n"
- + " DECIMAL_COL DECIMAL,\n"
+ + " DECIMAL_COL Decimal(12, 1),\n"
+ " DOUBLE_COL DOUBLE,\n"
+ " FLOAT_COL FLOAT,\n"
+ " INT_COL INT,\n"
@@ -214,9 +182,9 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
Short.parseShort("1"),
Byte.parseByte("1"),
Boolean.FALSE,
- BigDecimal.valueOf(2222243, 1),
+ BigDecimal.valueOf(12345, 1),
Double.parseDouble("2222243.2222243"),
- Float.parseFloat("222224"),
+ Float.parseFloat("22.17"),
Integer.parseInt("1"),
"a",
"VARCHAR_COL",
@@ -249,11 +217,14 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
try {
assertHasData(SINK_TABLE);
- String sourceSql = String.format("select * from %s.%s", DATABASE,
SOURCE_TABLE);
- String sinkSql = String.format("select * from %s.%s", DATABASE,
SINK_TABLE);
+ String sourceSql =
+ String.format(
+ "select * from %s.%s order by BIGINT_COL ",
DATABASE, SOURCE_TABLE);
+ String sinkSql =
+ String.format("select * from %s.%s order by BIGINT_COL ",
DATABASE, SINK_TABLE);
List<String> columnList =
Arrays.stream(COLUMN_STRING.split(","))
- .map(x -> x.trim())
+ .map(String::trim)
.collect(Collectors.toList());
Statement sourceStatement = jdbcConnection.createStatement();
Statement sinkStatement = jdbcConnection.createStatement();
@@ -262,27 +233,19 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(
sourceResultSet.getMetaData().getColumnCount(),
sinkResultSet.getMetaData().getColumnCount());
+ log.info(container.getServerLogs());
while (sourceResultSet.next()) {
if (sinkResultSet.next()) {
for (String column : columnList) {
Object source = sourceResultSet.getObject(column);
Object sink = sinkResultSet.getObject(column);
if (!Objects.deepEquals(source, sink)) {
- InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
- InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
- String sourceValue =
- IOUtils.toString(sourceAsciiStream,
StandardCharsets.UTF_8);
- String sinkValue =
- IOUtils.toString(sinkAsciiStream,
StandardCharsets.UTF_8);
- Assertions.assertEquals(sourceValue, sinkValue);
+ Assertions.assertEquals(String.valueOf(source),
String.valueOf(sink));
}
}
}
}
- // Check the row numbers is equal
- sourceResultSet.last();
- sinkResultSet.last();
- Assertions.assertEquals(sourceResultSet.getRow(),
sinkResultSet.getRow());
+ Assertions.assertFalse(sinkResultSet.next());
clearSinkTable();
} catch (Exception e) {
throw new RuntimeException("get starRocks connection error", e);
@@ -310,7 +273,7 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
// create source table
statement.execute(DDL_SOURCE);
// create sink table
- statement.execute(DDL_SINK);
+ // statement.execute(DDL_SINK);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
@@ -354,4 +317,64 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
throw new RuntimeException("test starrocks server image error", e);
}
}
+
+ @TestTemplate
+ public void testCatalog(TestContainer container) throws IOException,
InterruptedException {
+ TablePath tablePathStarRocksSource = TablePath.of("test",
"e2e_table_source");
+ TablePath tablePathStarRocksSink = TablePath.of("test",
"e2e_table_source_2");
+ StarRocksCatalog starRocksCatalog =
+ new StarRocksCatalog(
+ "StarRocks",
+ "root",
+ PASSWORD,
+ String.format(URL, starRocksServer.getHost()),
+ "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n
DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n
PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" ,
\n \"storage_format\" = \"DEFAULT\" \n )");
+ starRocksCatalog.open();
+ CatalogTable catalogTable =
starRocksCatalog.getTable(tablePathStarRocksSource);
+ // sink tableExists ?
+ starRocksCatalog.dropTable(tablePathStarRocksSink, true);
+ boolean tableExistsBefore =
starRocksCatalog.tableExists(tablePathStarRocksSink);
+ Assertions.assertFalse(tableExistsBefore);
+ // create table
+ starRocksCatalog.createTable(tablePathStarRocksSink, catalogTable,
true);
+ boolean tableExistsAfter =
starRocksCatalog.tableExists(tablePathStarRocksSink);
+ Assertions.assertTrue(tableExistsAfter);
+ // isExistsData ?
+ boolean existsDataBefore =
starRocksCatalog.isExistsData(tablePathStarRocksSink);
+ Assertions.assertFalse(existsDataBefore);
+ // insert one data
+ String customSql =
+ "insert into "
+ + DATABASE
+ + "."
+ + "e2e_table_source_2"
+ + " (\n"
+ + " BIGINT_COL,\n"
+ + " LARGEINT_COL,\n"
+ + " SMALLINT_COL,\n"
+ + " TINYINT_COL,\n"
+ + " BOOLEAN_COL,\n"
+ + " DECIMAL_COL,\n"
+ + " DOUBLE_COL,\n"
+ + " FLOAT_COL,\n"
+ + " INT_COL,\n"
+ + " CHAR_COL,\n"
+ + " VARCHAR_11_COL,\n"
+ + " STRING_COL,\n"
+ + " DATETIME_COL,\n"
+ + " DATE_COL\n"
+ + ")values(\n"
+ + "\t
999,12345,1,1,false,1.1,9.9,2.5,3,'A','ADC','ASEDF','2022-08-13
17:35:59','2022-08-13'\n"
+ + ")";
+ starRocksCatalog.executeSql(tablePathStarRocksSink, customSql);
+ boolean existsDataAfter =
starRocksCatalog.isExistsData(tablePathStarRocksSink);
+ Assertions.assertTrue(existsDataAfter);
+ // truncateTable
+ starRocksCatalog.truncateTable(tablePathStarRocksSink, true);
+
Assertions.assertFalse(starRocksCatalog.isExistsData(tablePathStarRocksSink));
+ // drop table
+ starRocksCatalog.dropTable(tablePathStarRocksSink, true);
+
Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink));
+ starRocksCatalog.close();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index 54f6de4b6c..7b4c25af73 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -61,10 +61,14 @@ sink {
table = "e2e_table_sink"
batch_max_rows = 100
max_retries = 3
-
+ base-url="jdbc:mysql://starrocks_e2e:9030/test"
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="APPEND_DATA"
+ save_mode_create_template = "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n
DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n
PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" ,
\n \"storage_format\" = \"DEFAULT\" \n )"
+
}
}
\ No newline at end of file