This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new db8cda9 schema change triple varchar type length (#135)
db8cda9 is described below
commit db8cda9557660ded9e121aa1e6b0defbcee3d2cb
Author: gnehil <[email protected]>
AuthorDate: Tue Apr 25 23:20:51 2023 +0800
schema change triple varchar type length (#135)
* schema change varchar type length triple
* varchar length less than 65533
---
.../sink/writer/JsonDebeziumSchemaSerializer.java | 47 ++++++++++++++++------
.../writer/TestJsonDebeziumSchemaSerializer.java | 9 ++++-
2 files changed, 42 insertions(+), 14 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 8458091..c7295e2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -95,18 +95,20 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return null;
}
Map<String, String> valueMap;
- if (OP_READ.equals(op) || OP_CREATE.equals(op)) {
- valueMap = extractAfterRow(recordRoot);
- addDeleteSign(valueMap,false);
- } else if (OP_UPDATE.equals(op)) {
- valueMap = extractAfterRow(recordRoot);
- addDeleteSign(valueMap,false);
- } else if (OP_DELETE.equals(op)) {
- valueMap = extractBeforeRow(recordRoot);
- addDeleteSign(valueMap,true);
- } else {
- LOG.error("parse record fail, unknown op {} in {}",op,record);
- return null;
+ switch (op) {
+ case OP_READ:
+ case OP_CREATE:
+ case OP_UPDATE:
+ valueMap = extractAfterRow(recordRoot);
+ addDeleteSign(valueMap, false);
+ break;
+ case OP_DELETE:
+ valueMap = extractBeforeRow(recordRoot);
+ addDeleteSign(valueMap, true);
+ break;
+ default:
+ LOG.error("parse record fail, unknown op {} in {}", op,
record);
+ return null;
}
return
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}
@@ -262,7 +264,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
String op = matcher.group(1);
String col = matcher.group(3);
String type = matcher.group(5);
- type = type == null ? "" : type;
+ type = handleType(type);
ddl = String.format(EXECUTE_DDL,
dorisOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
@@ -306,4 +308,23 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return new JsonDebeziumSchemaSerializer(dorisOptions,
addDropDDLPattern, sourceTableName);
}
}
+
+ private String handleType(String type) {
+
+ if (type == null || "".equals(type)) {
+ return "";
+ }
+
+ // varchar len * 3
+ Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)",
Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(type);
+ if (matcher.find()) {
+ String len = matcher.group(1);
+ return String.format("varchar(%d)", Math.min(Integer.parseInt(len)
* 3, 65533));
+ }
+
+ return type;
+
+ }
+
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index ed5c37f..de5a4de 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -100,11 +100,18 @@ public class TestJsonDebeziumSchemaSerializer {
@Test
public void testExtractDDL() throws IOException {
- String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)";
+ String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
String record =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
[...]
JsonNode recordRoot = objectMapper.readTree(record);
String ddl = serializer.extractDDL(recordRoot);
Assert.assertEquals(srcDDL, ddl);
+
+ String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
+ String record1 =
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
[...]
+ JsonNode recordRoot1 = objectMapper.readTree(record1);
+ String ddl1 = serializer.extractDDL(recordRoot1);
+ Assert.assertEquals(targetDDL, ddl1);
+
}
@Ignore
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]