This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new ba280cc [fix] Fix testExtractDDLListMultipleColumns unit test (#246)
ba280cc is described below
commit ba280ccf3a34edd84c61981edf9792c09d080dc5
Author: wudongliang <[email protected]>
AuthorDate: Wed Nov 29 10:11:29 2023 +0800
[fix] Fix testExtractDDLListMultipleColumns unit test (#246)
---
.../writer/TestJsonDebeziumSchemaSerializer.java | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 64c27b7..bc037a8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -167,21 +167,23 @@ public class TestJsonDebeziumSchemaSerializer {
@Test
public void testExtractDDLListMultipleColumns() throws IOException {
- String sql0 = "ALTER TABLE test.t1 ADD COLUMN c2 INT";
- String sql1 = "ALTER TABLE test.t1 ADD COLUMN c555 VARCHAR(400)";
- String sql2 = "ALTER TABLE test.t1 ADD COLUMN c666 INT DEFAULT '100'";
- String sql3 = "ALTER TABLE test.t1 ADD COLUMN c4 BIGINT DEFAULT '555'";
- String sql4 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
- String sql5 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
- String sql6 = "ALTER TABLE test.t1 DROP COLUMN name";
- String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time";
- String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1";
- String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc";
- List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4,
sql5, sql6, sql7, sql8, sql9);
+ String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
+ String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
+ String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
+ String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
+ List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
+
+ Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
+ originFiledSchemaMap.put("c2", new FieldSchema());
+ originFiledSchemaMap.put("c555", new FieldSchema());
+ originFiledSchemaMap.put("c666", new FieldSchema());
+ originFiledSchemaMap.put("c4", new FieldSchema());
+ originFiledSchemaMap.put("c13", new FieldSchema());
String record
=
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
[...]
JsonNode recordRoot = objectMapper.readTree(record);
+ serializer.setOriginFieldSchemaMap(originFiledSchemaMap);
List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
for (int i = 0; i < ddlSQLList.size(); i++) {
String srcSQL = srcSqlList.get(i);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]