This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b5140f598e [Improvement] add starrocks jdbc dialect (#7294)
b5140f598e is described below
commit b5140f598ee8f02ba7e6d25428c56a5904aa01ad
Author: Jarvis <[email protected]>
AuthorDate: Wed Aug 14 22:33:57 2024 +0800
[Improvement] add starrocks jdbc dialect (#7294)
---
docs/en/connector-v2/sink/Jdbc.md | 4 +-
docs/en/connector-v2/source/Jdbc.md | 2 +-
docs/zh/connector-v2/sink/Jdbc.md | 4 +-
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../dialect/mysql/MySqlDialectFactory.java | 5 +++
.../StarRocksDialect.java} | 26 ++++++-------
.../seatunnel/jdbc/JdbcStarRocksdbIT.java | 5 ++-
.../src/test/resources/jdbc_starrocks_dialect.conf | 44 ++++++++++++++++++++++
8 files changed, 71 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index c46933b486..8ec58506b4 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -82,7 +82,9 @@ Use this sql write upstream input datas to database. e.g
`INSERT ...`
### compatible_mode [string]
-The compatible mode of database, required when the database supports multiple
compatible modes. For example, when using OceanBase database, you need to set
it to 'mysql' or 'oracle'.
+The compatible mode of database, required when the database supports multiple
compatible modes.
+
+For example, when using OceanBase database, you need to set it to 'mysql' or
'oracle'. when using StarRocks, you need set it to `starrocks`.
Postgres 9.5 version or below,please set it to `postgresLow` to support cdc
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 7fab8d50b2..1b9acc025b 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -46,7 +46,7 @@ supports query SQL and can achieve projection effect.
| user | String | No | -
| userName
[...]
| password | String | No | -
| password
[...]
| query | String | No | -
| Query statement
[...]
-| compatible_mode | String | No | -
| The compatible mode of database, required when the database supports
multiple compatible modes. For example, when using OceanBase database, you need
to set it to 'mysql' or 'oracle'.
[...]
+| compatible_mode | String | No | -
| The compatible mode of database, required when the database supports
multiple compatible modes.<br/> For example, when using OceanBase database, you
need to set it to 'mysql' or 'oracle'. <br/> when using starrocks, you need set
it to `starrocks`
[...]
| connection_check_timeout_sec | Int | No | 30
| The time in seconds to wait for the database operation used to validate
the connection to complete.
[...]
| partition_column | String | No | -
| The column name for split data.
[...]
| partition_upper_bound | Long | No | -
| The partition_column max value for scan, if not set SeaTunnel will query
database get max value.
[...]
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index d61292cb92..0cc8605a37 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -79,7 +79,9 @@ JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`
### compatible_mode [string]
-数据库的兼容模式,当数据库支持多种兼容模式时需要。例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。
+数据库的兼容模式,当数据库支持多种兼容模式时需要。
+
+例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle'
。使用StarRocks时,需要将其设置为`starrocks`。
Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index bf00298a74..45f849c28b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -25,6 +25,7 @@ public class DatabaseIdentifier {
public static final String INFORMIX = "Informix";
public static final String KINGBASE = "KingBase";
public static final String MYSQL = "MySQL";
+ public static final String STARROCKS = "StarRocks";
public static final String ORACLE = "Oracle";
public static final String PHOENIX = "Phoenix";
public static final String POSTGRESQL = "Postgres";
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
index a4f89a4dc8..f8278a60cc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks.StarRocksDialect;
import com.google.auto.service.AutoService;
@@ -39,6 +41,9 @@ public class MySqlDialectFactory implements
JdbcDialectFactory {
@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) {
+ return new StarRocksDialect(fieldIde);
+ }
return new MysqlDialect(fieldIde);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
similarity index 64%
copy from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
copy to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
index a4f89a4dc8..d7ee796527 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
@@ -15,30 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
-import com.google.auto.service.AutoService;
+public class StarRocksDialect extends MysqlDialect {
-import javax.annotation.Nonnull;
+ public StarRocksDialect() {}
-/** Factory for {@link MysqlDialect}. */
-@AutoService(JdbcDialectFactory.class)
-public class MySqlDialectFactory implements JdbcDialectFactory {
- @Override
- public boolean acceptsURL(String url) {
- return url.startsWith("jdbc:mysql:");
+ public StarRocksDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
}
@Override
- public JdbcDialect create() {
- return new MysqlDialect();
+ public String dialectName() {
+ return DatabaseIdentifier.STARROCKS;
}
@Override
- public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
- return new MysqlDialect(fieldIde);
+ public String hashModForField(String fieldName, int mod) {
+ return "ABS(md5sum_numeric(" + quoteIdentifier(fieldName) + ") % " +
mod + ")";
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index e7fc94e642..1d41c480c3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -37,7 +37,7 @@ import java.util.List;
public class JdbcStarRocksdbIT extends AbstractJdbcIT {
- private static final String DOCKER_IMAGE =
"d87904488/starrocks-starter:2.2.1";
+ private static final String DOCKER_IMAGE =
"starrocks/allin1-ubuntu:2.5.12";
private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
private static final String NETWORK_ALIASES = "e2e_starRocksdb";
private static final int SR_PORT = 9030;
@@ -51,7 +51,8 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
private static final String SINK_TABLE = "e2e_table_sink";
private static final List<String> CONFIG_FILE =
- Lists.newArrayList("/jdbc_starrocks_source_to_sink.conf");
+ Lists.newArrayList(
+ "/jdbc_starrocks_source_to_sink.conf",
"/jdbc_starrocks_dialect.conf");
private static final String CREATE_SQL =
"create table %s (\n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
new file mode 100644
index 0000000000..69fe5538f5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
+}
+
+source {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://e2e_starRocksdb:9030"
+ user = root
+ password = ""
+ query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL,
BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL,
VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from
`test`.`e2e_table_source`"
+ partition_column = "STRING_COL"
+ compatible_mode = "starrocks"
+ }
+}
+
+sink {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://e2e_starRocksdb:9030"
+ user = root
+ password = ""
+ query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL,
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL,
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+}