This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3c6f216135 [Fix][Connector-V2] Fix possible data loss in scenarios of
request_tablet_size is less than the number of BUCKETS (#8768)
3c6f216135 is described below
commit 3c6f216135dfd48c2539a400b596ef82ce24fcb3
Author: xiaochen <[email protected]>
AuthorDate: Fri Feb 21 11:39:24 2025 +0800
[Fix][Connector-V2] Fix possible data loss in scenarios of
request_tablet_size is less than the number of BUCKETS (#8768)
---
.../client/source/StarRocksBeReadClient.java | 5 +++-
.../e2e/connector/starrocks/StarRocksIT.java | 9 +++++-
.../starrocks-thrift-to-starrocks-streamload.conf | 1 +
...ks-streamload.conf => starrocks-to-assert.conf} | 32 ++++++++++------------
4 files changed, 28 insertions(+), 19 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
index c0be0106bb..3fa50f1cc0 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
@@ -92,7 +92,6 @@ public class StarRocksBeReadClient implements Serializable {
}
public void openScanner(QueryPartition partition, SeaTunnelRowType
seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
params.setTablet_ids(new ArrayList<>(tabletIds));
@@ -135,6 +134,10 @@ public class StarRocksBeReadClient implements Serializable
{
contextId,
tabletIds.size(),
tabletIds);
+ this.eos.set(false);
+ this.rowBatch = null;
+ this.readerOffset = 0;
+ this.seaTunnelRowType = seaTunnelRowType;
}
public boolean hasNext() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index c49b1bfa41..2d2eda1ae2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -105,7 +105,7 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
+ " DATE_COL DATE\n"
+ ")ENGINE=OLAP\n"
+ "DUPLICATE KEY(`BIGINT_COL`)\n"
- + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n"
+ + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\",\n"
+ "\"in_memory\" = \"false\","
@@ -419,4 +419,11 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink));
starRocksCatalog.close();
}
+
+ @TestTemplate
+ public void testStarRocksReadRowCount(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/starrocks-to-assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
index ca47a8eb08..8af2b36107 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
@@ -28,6 +28,7 @@ source {
database = "test"
table = "e2e_table_source"
max_retries = 3
+ request_tablet_size = 5
schema {
fields {
BIGINT_COL = BIGINT
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
similarity index 68%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
index ca47a8eb08..416d1ec853 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert.conf
@@ -28,6 +28,7 @@ source {
database = "test"
table = "e2e_table_source"
max_retries = 3
+ request_tablet_size = 1
schema {
fields {
BIGINT_COL = BIGINT
@@ -54,22 +55,19 @@ transform {
}
sink {
- StarRocks {
- nodeUrls = ["starrocks_e2e:8030"]
- username = root
- password = ""
- database = "test"
- table = "e2e_table_sink"
- batch_max_rows = 100
- max_retries = 3
- base-url="jdbc:mysql://starrocks_e2e:9030/test"
- starrocks.config = {
- format = "JSON"
- strip_outer_array = true
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
}
- "schema_save_mode"="RECREATE_SCHEMA"
- "data_save_mode"="APPEND_DATA"
- save_mode_create_template = "CREATE TABLE IF NOT EXISTS
`${database}`.`${table}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE
KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES
(\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n
\"storage_format\" = \"DEFAULT\" \n )"
-
- }
}
\ No newline at end of file