This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 766b6905 [Chore] Split some case logic (#487)
766b6905 is described below
commit 766b69056a8796448352b3364b901086d402d74c
Author: wudi <[email protected]>
AuthorDate: Wed Sep 11 15:27:28 2024 +0800
[Chore] Split some case logic (#487)
---
.../doris/flink/tools/cdc/DatabaseSyncConfig.java | 2 --
.../doris/flink/tools/cdc/DorisTableConfig.java | 12 ++++++-----
.../flink/container/e2e/Doris2DorisE2ECase.java | 23 ++--------------------
.../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 4 ++--
.../flink/tools/cdc/CdcMongoSyncDatabaseCase.java | 4 ++--
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 4 ++--
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 4 ++--
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 4 ++--
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 4 ++--
9 files changed, 21 insertions(+), 40 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
index 6c78d5cd..e1a089ff 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
@@ -69,8 +69,6 @@ public class DatabaseSyncConfig {
public static final String SINGLE_SINK = "single-sink";
////////// doris-table-conf //////////
public static final String TABLE_CONF = "table-conf";
- public static final String REPLICATION_NUM = "replication_num";
- public static final String TABLE_BUCKETS = "table-buckets";
////////// date-converter-conf //////////
public static final String CONVERTERS = "converters";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
index 912ed698..6318fc8a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -26,8 +26,11 @@ import java.util.Map;
import java.util.Objects;
public class DorisTableConfig implements Serializable {
- private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+ public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
// PROPERTIES parameter in doris table creation statement. such as:
replication_num=1.
+ public static final String REPLICATION_NUM = "replication_num";
+ public static final String TABLE_BUCKETS = "table-buckets";
+
private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed
and integrated into the
// doris table creation statement. such as:
table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
@@ -48,10 +51,9 @@ public class DorisTableConfig implements Serializable {
if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
}
- if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
- this.tableBuckets =
-
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
- tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
+ if (tableConfig.containsKey(TABLE_BUCKETS)) {
+ this.tableBuckets =
buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
+ tableConfig.remove(TABLE_BUCKETS);
}
tableProperties = tableConfig;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index 4b4e3b26..f7b3bee7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -24,11 +24,9 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.AbstractContainerTestBase;
import org.apache.doris.flink.container.ContainerUtils;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,19 +35,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-public class Doris2DorisE2ECase extends AbstractE2EService {
+public class Doris2DorisE2ECase extends AbstractContainerTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(Doris2DorisE2ECase.class);
private static final String DATABASE_SOURCE = "test_doris2doris_source";
private static final String DATABASE_SINK = "test_doris2doris_sink";
private static final String TABLE = "test_tbl";
- @Before
- public void setUp() throws InterruptedException {
- LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
- SEMAPHORE.acquire();
- LOG.info("Doris2DorisE2ECase semaphore acquired.");
- }
-
@Test
public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
@@ -163,14 +154,4 @@ public class Doris2DorisE2ECase extends AbstractE2EService
{
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG,
sinkInitSql);
LOG.info("Initialization of doris table successful.");
}
-
- @After
- public void close() {
- try {
- // Ensure that semaphore is always released
- } finally {
- LOG.info("Doris2DorisE2ECase releasing semaphore.");
- SEMAPHORE.release();
- }
- }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 0666cb9d..05536557 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -65,8 +65,8 @@ public class CdcDb2SyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "FULL_TYPES";
String excludingTables = null;
String multiToOneOrigin = null;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
index 09006b2c..ffc8a75d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
@@ -68,8 +68,8 @@ public class CdcMongoSyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1");
String includingTables = "cdc_test";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c430ea87..e85e888f 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -67,8 +67,8 @@ public class CdcMysqlSyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
// String includingTables = "tbl1|tbl2|tbl3";
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index a47212c8..92600ffd 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -62,8 +62,8 @@ public class CdcOraclelSyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 99892e02..33184011 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -66,8 +66,8 @@ public class CdcPostgresSyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index ca6a3121..7a1cf276 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -65,8 +65,8 @@ public class CdcSqlServerSyncDatabaseCase {
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
- tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]