This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new bb10b00 [Improve](cdc) Fix default value in cdc (#305)
bb10b00 is described below
commit bb10b000c31af0001436f2c3ba99797600fe8a28
Author: wudi <[email protected]>
AuthorDate: Wed Jan 24 16:22:29 2024 +0800
[Improve](cdc) Fix default value in cdc (#305)
1. Currently, creating a table does not support default value.
2. Currently, when the schema changes, quotation marks will not be added to
the default value.
---
.../doris/flink/catalog/doris/DorisSystem.java | 20 ++++++++++++++------
.../doris/flink/sink/schema/SchemaChangeHelper.java | 4 ++--
.../jsondebezium/JsonDebeziumSchemaChangeImplV2.java | 10 ++--------
.../apache/doris/flink/tools/cdc/SourceSchema.java | 6 ++++--
.../flink/sink/schema/SchemaChangeHelperTest.java | 16 ++++++++--------
.../doris/flink/sink/schema/SchemaManagerTest.java | 12 ++++++++++++
.../TestJsonDebeziumSchemaChangeImplV2.java | 2 +-
7 files changed, 43 insertions(+), 27 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 45266e5..86745fa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -246,12 +246,20 @@ public class DorisSystem implements Serializable {
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
- sql.append(identifier(field.getName()))
- .append(" ")
- .append(fieldType)
- .append(" COMMENT '")
- .append(quoteComment(field.getComment()))
- .append("',");
+ sql.append(identifier(field.getName())).append(" ").append(fieldType);
+
+ if (field.getDefaultValue() != null) {
+ sql.append(" DEFAULT " +
quoteDefaultValue(field.getDefaultValue()));
+ }
+ sql.append(" COMMENT
'").append(quoteComment(field.getComment())).append("',");
+ }
+
+ public static String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ return defaultValue;
+ }
+ return "'" + defaultValue + "'";
}
public static String quoteComment(String comment) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 580b990..4c29c34 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -110,8 +110,8 @@ public class SchemaChangeHelper {
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(name),
type);
- if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
- addDDL = addDDL + " DEFAULT " + defaultValue;
+ if (defaultValue != null) {
+ addDDL = addDDL + " DEFAULT " +
DorisSystem.quoteDefaultValue(defaultValue);
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment)
+ "'";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index b3a90e6..08e8f05 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -369,17 +369,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
- // Due to historical reasons, doris needs to add quotes to
- // the default value of the new column
- // For example in mysql: alter table add column c1 int default 100
- // In Doris: alter table add column c1 int default '100'
- if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
- return defaultValue;
- } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+ if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is
1970-01-01 00:00:00
return "current_timestamp";
}
- return "'" + defaultValue + "'";
+ return defaultValue;
}
@VisibleForTesting
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 1ad09d7..c525202 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -59,17 +59,19 @@ public abstract class SourceSchema {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
+ String defaultValue = rs.getString("COLUMN_DEF");
Integer precision = rs.getInt("COLUMN_SIZE");
-
if (rs.wasNull()) {
precision = null;
}
+
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision,
scale);
- fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr,
comment));
+ fields.put(
+ fieldName, new FieldSchema(fieldName, dorisTypeStr,
defaultValue, comment));
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index b577832..17a71f8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -33,15 +33,15 @@ public class SchemaChangeHelperTest {
@Before
public void setUp() {
- originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
+ originFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+ originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+ originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
null, ""));
- updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
"", ""));
- updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "",
""));
- updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)",
"", ""));
+ updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+ updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+ updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)",
null, ""));
+ updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", null,
""));
+ updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)",
null, ""));
}
@Test
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 977f8da..5b553fc 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -113,6 +113,18 @@ public class SchemaManagerTest {
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT
'comment \"\\'sdf\\''",
addColumnDDL);
+
+ field = new FieldSchema("col", "int", "10", "comment \"'sdf'");
+ addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink",
field);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT
'10' COMMENT 'comment \"\\'sdf\\''",
+ addColumnDDL);
+
+ field = new FieldSchema("col", "int", "current_timestamp", "comment
\"'sdf'");
+ addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink",
field);
+ Assert.assertEquals(
+ "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT
current_timestamp COMMENT 'comment \"\\'sdf\\''",
+ addColumnDDL);
}
@Test
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 0ce60d3..11df3e0 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -154,7 +154,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)",
null, null));
srcFiledSchemaMap.put(
"test_time", new FieldSchema("test_time", "DATETIMEV2(0)",
null, null));
- srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'",
null));
+ srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
schemaChange.setSourceConnector("mysql");
String columnsString =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]