This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4e417520 [Improve] add test case for cdc sync (#580)
4e417520 is described below
commit 4e417520885e3d507094bd2c84b61fa626ad6596
Author: wudi <[email protected]>
AuthorDate: Wed Mar 19 10:02:28 2025 +0800
[Improve] add test case for cdc sync (#580)
---
.../jsondebezium/JsonDebeziumSchemaChange.java | 5 ++-
.../JsonDebeziumSchemaChangeImplV2.java | 4 +-
.../jsondebezium/SQLParserSchemaChange.java | 1 +
.../flink/container/e2e/Mysql2DorisE2ECase.java | 45 ++++++++++++++++++----
.../doris/flink/lookup/DorisLookupTableITCase.java | 6 ++-
.../container/e2e/mysql2doris/testMySQL2Doris.txt | 6 ++-
.../e2e/mysql2doris/testMySQL2Doris_init.sql | 2 +-
7 files changed, 56 insertions(+), 13 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 16757d27..ff47880d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -46,12 +46,13 @@ import java.util.regex.Pattern;
/**
* Synchronize the schema change in the upstream data source to the doris
database table.
*
- * <p>There are two schema change modes:<br>
+ * <p>There are three schema change modes:<br>
* 1. {@link JsonDebeziumSchemaChangeImpl} only supports table column name and
column type changes,
* and this mode is used by default. <br>
* 2. {@link JsonDebeziumSchemaChangeImplV2} supports table column name,
column type, default,
* comment synchronization, supports multi-column changes, and supports column
name rename. Need to
- * be enabled by configuring use-new-schema-change.
+ * be enabled by configuring use-new-schema-change. <br>
+ * 3. {@link SQLParserSchemaChange} Schema Change by parsing upstream DDL.
*/
public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
private static final Logger LOG =
LoggerFactory.getLogger(JsonDebeziumSchemaChange.class);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index abd6f55b..63559e52 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -55,8 +55,9 @@ import java.util.regex.Pattern;
/**
* Extract the columns that need to be changed based on the change records of
the upstream data
- * source.
+ * source. Recommended use SQLParserSchemaChange.
*/
+@Deprecated
public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
private static final Logger LOG =
LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
private static final Pattern renameDDLPattern =
@@ -183,6 +184,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
// remove backtick
ddl = ddl.replace("`", "");
// rename ddl
+ // It is better to use sql_parser mode for rename
Matcher renameDdlMatcher = renameDDLPattern.matcher(ddl);
if (renameDdlMatcher.find()) {
String oldColumnName = renameDdlMatcher.group(2);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index eda223d6..1f145da4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+/** Schema changes are made by parsing upstream DDL statements. */
public class SQLParserSchemaChange extends JsonDebeziumSchemaChange {
private static final Logger LOG =
LoggerFactory.getLogger(SQLParserSchemaChange.class);
private final SQLParserSchemaManager sqlParserSchemaManager;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
index cb7d83ad..e1c8c807 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -99,10 +99,22 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
// wait 2 times checkpoint
Thread.sleep(20000);
+ LOG.info("Start to verify create table result.");
+ String tblQuery =
+ String.format(
+ "SELECT TABLE_NAME \n"
+ + "FROM INFORMATION_SCHEMA.TABLES \n"
+ + "WHERE TABLE_SCHEMA = '%s'",
+ DATABASE);
+ List<String> expectedTables =
+ Arrays.asList("ods_tbl1_incr", "ods_tbl2_incr",
"ods_tbl3_incr", "ods_tbl5_incr");
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1,
false);
+
LOG.info("Start to verify init result.");
List<String> expected = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3", "doris_5,5");
String sql1 =
- "select * from ( select * from test_e2e_mysql.tbl1 union all
select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3
union all select * from test_e2e_mysql.tbl5) res order by 1";
+ "select * from ( select * from test_e2e_mysql.ods_tbl1_incr
union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from
test_e2e_mysql.ods_tbl3_incr union all select * from
test_e2e_mysql.ods_tbl5_incr) res order by 1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected,
sql1, 2);
// add incremental data
@@ -121,28 +133,47 @@ public class Mysql2DorisE2ECase extends
AbstractE2EService {
Arrays.asList(
"doris_1,18", "doris_1_1,10", "doris_2_1,11",
"doris_3,3", "doris_3_1,12");
String sql2 =
- "select * from ( select * from test_e2e_mysql.tbl1 union all
select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 )
res order by 1";
+ "select * from ( select * from test_e2e_mysql.ods_tbl1_incr
union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from
test_e2e_mysql.ods_tbl3_incr ) res order by 1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2,
sql2, 2);
- // mock schema change
+ // mock schema change ALTER TABLE table_name RENAME COLUMN
old_column_name TO
+ // new_column_name
LOG.info("start to schema change in mysql.");
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
"alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
- "alter table test_e2e_mysql.tbl1 drop column age");
+ "alter table test_e2e_mysql.tbl1 drop column age",
+ "alter table test_e2e_mysql.tbl2 rename column age to
age_new_1",
+ "alter table test_e2e_mysql.tbl3 change column age age_new_2
int");
Thread.sleep(10000);
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
- "insert into test_e2e_mysql.tbl1 values
('doris_1_1_1','c1_val')");
+ "insert into test_e2e_mysql.tbl1 values
('doris_1_1_1','c1_val')",
+ "insert into test_e2e_mysql.tbl2 values
('doris_tbl2_rename_test',18)",
+ "insert into test_e2e_mysql.tbl3 values
('doris_tbl3_rename_test',38)");
Thread.sleep(20000);
- LOG.info("verify tal1 schema change.");
+ LOG.info("verify tbl1 schema change.");
List<String> schemaChangeExpected =
Arrays.asList("doris_1,null", "doris_1_1,null",
"doris_1_1_1,c1_val");
- String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by
1";
+ String schemaChangeSql = "select name,c1 from
test_e2e_mysql.ods_tbl1_incr order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, schemaChangeExpected,
schemaChangeSql, 2);
+
+ LOG.info("verify tbl2 schema change.");
+ schemaChangeExpected = Arrays.asList("doris_2_1,11",
"doris_tbl2_rename_test,18");
+ schemaChangeSql = "select name,age_new_1 from
test_e2e_mysql.ods_tbl2_incr order by 1";
ContainerUtils.checkResult(
getDorisQueryConnection(), LOG, schemaChangeExpected,
schemaChangeSql, 2);
+
+ LOG.info("verify tbl3 schema change.");
+ schemaChangeExpected =
+ Arrays.asList("doris_3,3", "doris_3_1,12",
"doris_tbl3_rename_test,38");
+ schemaChangeSql = "select name,age_new_2 from
test_e2e_mysql.ods_tbl3_incr order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, schemaChangeExpected,
schemaChangeSql, 2);
+
cancelE2EJob(jobName);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
index 8ecb87da..d14666a6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
@@ -353,7 +353,11 @@ public class DorisLookupTableITCase extends
AbstractITCaseService {
+ "'password' = '%s',"
+ "'lookup.jdbc.async' = '%s',"
+ "'lookup.cache.ttl' = '10m',"
- + "'lookup.cache.max-rows' = '3'"
+ + "'lookup.cache.max-rows' = '3',"
+ + "'lookup.max-retries' = '1',"
+ + "'lookup.jdbc.read.batch.size' = '2',"
+ + "'lookup.jdbc.read.batch.queue-size' = '25',"
+ + "'lookup.jdbc.read.thread-size' = '5'"
+ ")",
getFenodes(),
getDorisQueryUrl(),
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
index 90a0eddc..6e248d19 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -1,8 +1,12 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
+ --table-prefix ods_
+ --table-suffix _incr
--including-tables "tbl.*"
+ --excluding-tables "tbl4"
--sink-conf sink.ignore.update-before=false
--table-conf replication_num=1
--single-sink true
- --ignore-default-value false
\ No newline at end of file
+ --ignore-default-value false
+ --schema-change-mode sql_parser
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
index f1042491..8bdde210 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -29,7 +29,7 @@ CREATE TABLE test_e2e_mysql.tbl4 (
`name` varchar(256) primary key,
`age` int
);
-
+insert into test_e2e_mysql.tbl4 values ('doris_4',4);
DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
CREATE TABLE test_e2e_mysql.tbl5 (
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]