This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 55eebfa8af [Feature][Connector-V2] Starrocks implements multi table 
sink (#8467)
55eebfa8af is described below

commit 55eebfa8af2f3e040be670a61ef46fa8402b891d
Author: Wanming Shi <[email protected]>
AuthorDate: Thu Jan 9 18:32:29 2025 +0800

    [Feature][Connector-V2] Starrocks implements multi table sink (#8467)
---
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  3 +-
 .../starrocks/sink/StarRocksSinkFactory.java       |  2 ++
 .../starrocks/sink/StarRocksSinkWriter.java        |  3 +-
 .../src/test/resources/ddl/shop.sql                |  1 -
 .../starrocks/StarRocksSchemaChangeIT.java         | 38 ++++++++++++++++++++--
 .../src/test/resources/ddl/shop.sql                | 38 +++++++++++++++++++++-
 .../mysqlcdc_to_starrocks_with_schema_change.conf  |  6 ++--
 7 files changed, 81 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 1f6ffcd076..bb0259ec90 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
 import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -40,7 +41,7 @@ import java.util.List;
 import java.util.Optional;
 
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
-        implements SupportSaveMode, SupportSchemaEvolutionSink {
+        implements SupportSaveMode, SupportSchemaEvolutionSink, 
SupportMultiTableSink {
 
     private final TableSchema tableSchema;
     private final SinkConfig sinkConfig;
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index bc851a91ed..6057eb97af 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -37,6 +37,7 @@ import com.google.auto.service.AutoService;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
 import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE;
 
 @AutoService(Factory.class)
@@ -64,6 +65,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                         StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
                         StarRocksSinkOptions.SCHEMA_SAVE_MODE,
                         StarRocksSinkOptions.DATA_SAVE_MODE,
+                        MULTI_TABLE_SINK_REPLICA,
                         StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
                         StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
                 .conditional(
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index d366408731..612e6e2b72 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -46,7 +47,7 @@ import java.util.Optional;
 
 @Slf4j
 public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
-        implements SupportSchemaEvolutionSinkWriter {
+        implements SupportMultiTableSinkWriter<Void>, 
SupportSchemaEvolutionSinkWriter {
     private StarRocksISerializer serializer;
     private StarRocksSinkManager manager;
     private TableSchema tableSchema;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
index f97d5852f3..9887b4e687 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql
@@ -59,7 +59,6 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
        (108,"jacket","water resistent black wind breaker",0.1),
        (109,"spare tire","24 inch spare tire",22.2);
 
-
 drop table if exists products_on_hand;
 CREATE TABLE products_on_hand (
   product_id INTEGER NOT NULL PRIMARY KEY,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
index ea7fe35fe0..f5b7522499 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java
@@ -174,7 +174,7 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase 
implements TestResour
 
     @TestTemplate
     public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer 
container)
-            throws InterruptedException, IOException {
+            throws InterruptedException, IOException, SQLException {
         String jobId = String.valueOf(JobIdGenerator.newJobId());
         String jobConfigFile = 
"/mysqlcdc_to_starrocks_with_schema_change.conf";
         CompletableFuture.runAsync(
@@ -187,6 +187,11 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase 
implements TestResour
                     }
                 });
         TimeUnit.SECONDS.sleep(20);
+
+        // verify multi table sink
+        verifyDataConsistency("orders");
+        verifyDataConsistency("customers");
+
         // waiting for case1 completed
         assertSchemaEvolutionForAddColumns(
                 DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, 
starRocksConnection);
@@ -194,9 +199,14 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase 
implements TestResour
         assertSchemaEvolutionForDropColumns(
                 DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, 
starRocksConnection);
 
+        insertNewDataIntoMySQL();
+        insertNewDataIntoMySQL();
+        // verify incremental
+        verifyDataConsistency("orders");
+
         // savepoint 1
         Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
-
+        insertNewDataIntoMySQL();
         // case2 drop columns with cdc data at same time
         shopDatabase.setTemplateName("drop_columns").createAndInitialize();
 
@@ -240,6 +250,30 @@ public class StarRocksSchemaChangeIT extends TestSuiteBase 
implements TestResour
         // waiting for case3/case4 completed
         assertTableStructureAndData(
                 DATABASE, SOURCE_TABLE, SINK_TABLE, mysqlConnection, 
starRocksConnection);
+        insertNewDataIntoMySQL();
+        // verify restore
+        verifyDataConsistency("orders");
+    }
+
+    private void insertNewDataIntoMySQL() throws SQLException {
+        mysqlConnection
+                .createStatement()
+                .execute(
+                        "INSERT INTO orders (id, customer_id, order_date, 
total_amount, status) "
+                                + "VALUES (null, 1, '2025-01-04 13:00:00', 
498.99, 'pending')");
+    }
+
+    private void verifyDataConsistency(String tableName) {
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertIterableEquals(
+                                        query(
+                                                String.format(QUERY, DATABASE, 
tableName),
+                                                mysqlConnection),
+                                        query(
+                                                String.format(QUERY, DATABASE, 
tableName),
+                                                starRocksConnection)));
     }
 
     private void assertSchemaEvolutionForAddColumns(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
index be2eaaeca9..b867cd24c3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/ddl/shop.sql
@@ -30,6 +30,30 @@ CREATE TABLE products (
   weight FLOAT
 );
 
+drop table if exists orders;
+
+CREATE TABLE orders (
+  id BIGINT AUTO_INCREMENT PRIMARY KEY,
+  customer_id BIGINT NOT NULL,
+  order_date DATETIME NOT NULL,
+  total_amount DECIMAL ( 10, 2 ) NOT NULL,
+  STATUS VARCHAR ( 50 ) DEFAULT 'pending',
+  created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+  updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
+);
+
+drop table if exists customers;
+
+CREATE TABLE customers (
+  id BIGINT PRIMARY KEY,
+  NAME VARCHAR ( 255 ) NOT NULL,
+  email VARCHAR ( 255 ) NOT NULL,
+  phone VARCHAR ( 50 ),
+  address TEXT,
+  created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+  updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
+);
+
 ALTER TABLE products AUTO_INCREMENT = 101;
 
 INSERT INTO products
@@ -41,4 +65,16 @@ VALUES (101,"scooter","Small 2-wheel scooter",3.14),
        (106,"hammer","16oz carpenter's hammer",1.0),
        (107,"rocks","box of assorted rocks",5.3),
        (108,"jacket","water resistent black wind breaker",0.1),
-       (109,"spare tire","24 inch spare tire",22.2);
\ No newline at end of file
+       (109,"spare tire","24 inch spare tire",22.2);
+
+INSERT INTO orders ( id, customer_id, order_date, total_amount, STATUS )
+VALUES
+    ( 1, 1, '2024-01-01 10:00:00', 299.99, 'completed' ),
+    ( 2, 2, '2024-01-02 11:00:00', 199.99, 'completed' ),
+    ( 3, 3, '2024-01-03 12:00:00', 399.99, 'processing' );
+
+INSERT INTO customers ( id, NAME, email, phone, address )
+VALUES
+    ( 1, 'John Doe', '[email protected]', '123-456-7890', '123 Main St' ),
+    ( 2, 'Jane Smith', '[email protected]', '234-567-8901', '456 Oak Ave' ),
+    ( 3, 'Bob Johnson', '[email protected]', '345-678-9012', '789 Pine Rd' );
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
index 76d86a4e8c..33fafcdf5f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
@@ -21,16 +21,14 @@
 env {
   # You can set engine configuration here
   job.mode = "STREAMING"
-  checkpoint.interval = 5000
-  read_limit.bytes_per_second=7000000
-  read_limit.rows_per_second=400
+  checkpoint.interval = 2000
 }
 
 source {
   MySQL-CDC {
     username = "st_user_source"
     password = "mysqlpw"
-    table-names = ["shop.products"]
+    table-names = ["shop.products", "shop.orders", "shop.customers"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
 
     schema-changes.enabled = true

Reply via email to