This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new fb254846 [fix](cdc)fix excluding pattern not working (#390)
fb254846 is described below
commit fb25484677481af5a0d18b48f900122f883f614e
Author: Petrichor <[email protected]>
AuthorDate: Fri Jun 7 11:15:00 2024 +0800
[fix](cdc)fix excluding pattern not working (#390)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 3 +--
.../doris/flink/tools/cdc/DatabaseSyncTest.java | 28 +++++++++++++++++++---
3 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index a1f511a8..691eaafa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -376,7 +376,7 @@ public abstract class DatabaseSync {
} else {
String excludingPattern =
String.format("?!(%s\\.(%s))$", getTableListPrefix(),
excludingTables);
- return String.format("(%s)(%s)", includingPattern,
excludingPattern);
+ return String.format("(%s)(%s)", excludingPattern,
includingPattern);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 03d3d076..63522472 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -230,8 +230,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
@Override
public String getTableListPrefix() {
- String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
- return databaseName;
+ return config.get(MySqlSourceOptions.DATABASE_NAME);
}
/**
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index 1e69c598..f0cd0a51 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/** Unit tests for the {@link DatabaseSync}. */
public class DatabaseSyncTest {
@@ -69,7 +71,7 @@ public class DatabaseSyncTest {
public void getTableBucketsTest() throws SQLException {
String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
DatabaseSync databaseSync = new MysqlDatabaseSync();
- Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableBuckets);
+ Map<String, Integer> tableBucketsMap =
DatabaseSync.getTableBuckets(tableBuckets);
assertEquals(10, tableBucketsMap.get("tbl1").intValue());
assertEquals(20, tableBucketsMap.get("tbl2").intValue());
assertEquals(30, tableBucketsMap.get("a.*").intValue());
@@ -81,7 +83,7 @@ public class DatabaseSyncTest {
public void setTableSchemaBucketsTest() throws SQLException {
DatabaseSync databaseSync = new MysqlDatabaseSync();
String tableSchemaBuckets =
"tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70";
- Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableSchemaBuckets);
+ Map<String, Integer> tableBucketsMap =
DatabaseSync.getTableBuckets(tableSchemaBuckets);
List<String> tableList =
Arrays.asList(
"tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13",
"b1", "b11", "b2",
@@ -103,7 +105,7 @@ public class DatabaseSyncTest {
public void setTableSchemaBucketsTest1() throws SQLException {
DatabaseSync databaseSync = new MysqlDatabaseSync();
String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40";
- Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableSchemaBuckets);
+ Map<String, Integer> tableBucketsMap =
DatabaseSync.getTableBuckets(tableSchemaBuckets);
List<String> tableList = Arrays.asList("a1", "a2", "a3", "b1", "a");
HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets1();
Set<String> tableSet = new HashSet<>();
@@ -147,4 +149,24 @@ public class DatabaseSyncTest {
matchedTableBucketsMap.put("tbl1", 10);
return matchedTableBucketsMap;
}
+
+ @Test
+ public void singleSinkTablePatternTest() throws SQLException {
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync.setSingleSink(true);
+ databaseSync.setIncludingTables(".*");
+ databaseSync.setExcludingTables("customer|dates|lineorder");
+ Configuration config = new Configuration();
+ config.setString("database-name", "ssb_test");
+ databaseSync.setConfig(config);
+ List<String> tableList =
+ Arrays.asList("customer", "dates", "lineorder", "test1",
"test2", "test3");
+ String syncTableListPattern = databaseSync.getSyncTableList(tableList);
+ assertTrue("ssb_test.test1".matches(syncTableListPattern));
+ assertTrue("ssb_test.test2".matches(syncTableListPattern));
+ assertTrue("ssb_test.test3".matches(syncTableListPattern));
+ assertFalse("ssb_test.customer".matches(syncTableListPattern));
+ assertFalse("ssb_test.dates".matches(syncTableListPattern));
+ assertFalse("ssb_test.lineorder".matches(syncTableListPattern));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]