This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 55eebfa8af [Feature][Connector-V2] Starrocks implements multi table
sink (#8467)
55eebfa8af is described below
commit 55eebfa8af2f3e040be670a61ef46fa8402b891d
Author: Wanming Shi <[email protected]>
AuthorDate: Thu Jan 9 18:32:29 2025 +0800
[Feature][Connector-V2] Starrocks implements multi table sink (#8467)
---
.../seatunnel/starrocks/sink/StarRocksSink.java | 3 +-
.../starrocks/sink/StarRocksSinkFactory.java | 2 ++
.../starrocks/sink/StarRocksSinkWriter.java | 3 +-
.../src/test/resources/ddl/shop.sql | 1 -
.../starrocks/StarRocksSchemaChangeIT.java | 38 ++++++++++++++++++++--
.../src/test/resources/ddl/shop.sql | 38 +++++++++++++++++++++-
.../mysqlcdc_to_starrocks_with_schema_change.conf | 6 ++--
7 files changed, 81 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 1f6ffcd076..bb0259ec90 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -40,7 +41,7 @@ import java.util.List;
import java.util.Optional;
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
- implements SupportSaveMode, SupportSchemaEvolutionSink {
+ implements SupportSaveMode, SupportSchemaEvolutionSink,
SupportMultiTableSink {
private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index bc851a91ed..6057eb97af 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -37,6 +37,7 @@ import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.List;
+import static
org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;
@AutoService(Factory.class)
@@ -64,6 +65,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
StarRocksSinkOptions.DATA_SAVE_MODE,
+ MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.conditional(
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index d366408731..612e6e2b72 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -46,7 +47,7 @@ import java.util.Optional;
@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
- implements SupportSchemaEvolutionSinkWriter {
+ implements SupportMultiTableSinkWriter<Void>,
SupportSchemaEvolutionSinkWriter {
private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
private TableSchema tableSchema;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
index f97d5852f3..9887b4e687 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
@@ -59,7 +59,6 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);
-
drop table if exists products_on_hand;
CREATE TABLE products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
index ea7fe35fe0..f5b7522499 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
@@ -174,7 +174,7 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase
implements TestResour
@TestTemplate
public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer
container)
- throws InterruptedException, IOException {
+ throws InterruptedException, IOException, SQLException {
String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile =
"/mysqlcdc_to_starrocks_with_schema_change.conf";
CompletableFuture.runAsync(
@@ -187,6 +187,11 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase
implements TestResour
}
});
TimeUnit.SECONDS.sleep(20);
+
+ // verify multi table sink
+ verifyDataConsistency("orders");
+ verifyDataConsistency("customers");
+
// waiting for case1 completed
assertSchemaEvolutionForAddColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection,
starRocksConnection);
@@ -194,9 +199,14 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase
implements TestResour
assertSchemaEvolutionForDropColumns(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection,
starRocksConnection);
+ insertNewDataIntoMySQL();
+ insertNewDataIntoMySQL();
+ // verify incremental
+ verifyDataConsistency("orders");
+
// savepoint 1
Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
-
+ insertNewDataIntoMySQL();
// case2 drop columns with cdc data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();
@@ -240,6 +250,30 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase
implements TestResour
// waiting for case3/case4 completed
assertTableStructureAndData(
DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection,
starRocksConnection);
+ insertNewDataIntoMySQL();
+ // verify restore
+ verifyDataConsistency("orders");
+ }
+
+ private void insertNewDataIntoMySQL() throws SQLException {
+ mysqlConnection
+ .createStatement()
+ .execute(
+ "INSERT INTO orders (id, customer_id, order_date,
total_amount, status) "
+ + "VALUES (null, 1, '2025-01-04 13:00:00',
498.99, 'pending')");
+ }
+
+ private void verifyDataConsistency(String tableName) {
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertIterableEquals(
+ query(
+ String.format(QUERY, DATABASE,
tableName),
+ mysqlConnection),
+ query(
+ String.format(QUERY, DATABASE,
tableName),
+ starRocksConnection)));
}
private void assertSchemaEvolutionForAddColumns(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
index be2eaaeca9..b867cd24c3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
@@ -30,6 +30,30 @@ CREATE TABLE products (
weight FLOAT
);
+drop table if exists orders;
+
+CREATE TABLE orders (
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
+ customer_id BIGINT NOT NULL,
+ order_date DATETIME NOT NULL,
+ total_amount DECIMAL ( 10, 2 ) NOT NULL,
+ STATUS VARCHAR ( 50 ) DEFAULT 'pending',
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
+);
+
+drop table if exists customers;
+
+CREATE TABLE customers (
+ id BIGINT PRIMARY KEY,
+ NAME VARCHAR ( 255 ) NOT NULL,
+ email VARCHAR ( 255 ) NOT NULL,
+ phone VARCHAR ( 50 ),
+ address TEXT,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
+);
+
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
@@ -41,4 +65,16 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(106,"hammer","16oz carpenter's hammer",1.0),
(107,"rocks","box of assorted rocks",5.3),
(108,"jacket","water resistent black wind breaker",0.1),
- (109,"spare tire","24 inch spare tire",22.2);
\ No newline at end of file
+ (109,"spare tire","24 inch spare tire",22.2);
+
+INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS )
+VALUES
+ ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ),
+ ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ),
+ ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' );
+
+INSERT INTO customers ( id, NAME, email, phone, address )
+VALUES
+ ( 1, 'John Doe', '[email protected]', '123-456-7890', '123 Main St' ),
+ ( 2, 'Jane Smith', '[email protected]', '234-567-8901', '456 Oak Ave' ),
+ ( 3, 'Bob Johnson', '[email protected]', '345-678-9012', '789 Pine Rd' );
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
index 76d86a4e8c..33fafcdf5f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
@@ -21,16 +21,14 @@
env {
# You can set engine configuration here
job.mode = "STREAMING"
- checkpoint.interval = 5000
- read_limit.bytes_per_second=7000000
- read_limit.rows_per_second=400
+ checkpoint.interval = 2000
}
source {
MySQL-CDC {
username = "st_user_source"
password = "mysqlpw"
- table-names = ["shop.products"]
+ table-names = ["shop.products", "shop.orders", "shop.customers"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true