This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bb10b00  [Improve](cdc) Fix default value in cdc (#305)
bb10b00 is described below

commit bb10b000c31af0001436f2c3ba99797600fe8a28
Author: wudi <[email protected]>
AuthorDate: Wed Jan 24 16:22:29 2024 +0800

    [Improve](cdc) Fix default value in cdc (#305)
    
    1. Currently, creating a table does not support default value.
    2. Currently, when the schema changes, quotation marks will not be added to 
the default value.
---
 .../doris/flink/catalog/doris/DorisSystem.java       | 20 ++++++++++++++------
 .../doris/flink/sink/schema/SchemaChangeHelper.java  |  4 ++--
 .../jsondebezium/JsonDebeziumSchemaChangeImplV2.java | 10 ++--------
 .../apache/doris/flink/tools/cdc/SourceSchema.java   |  6 ++++--
 .../flink/sink/schema/SchemaChangeHelperTest.java    | 16 ++++++++--------
 .../doris/flink/sink/schema/SchemaManagerTest.java   | 12 ++++++++++++
 .../TestJsonDebeziumSchemaChangeImplV2.java          |  2 +-
 7 files changed, 43 insertions(+), 27 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 45266e5..86745fa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -246,12 +246,20 @@ public class DorisSystem implements Serializable {
         if (isKey && DorisType.STRING.equals(fieldType)) {
             fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
         }
-        sql.append(identifier(field.getName()))
-                .append(" ")
-                .append(fieldType)
-                .append(" COMMENT '")
-                .append(quoteComment(field.getComment()))
-                .append("',");
+        sql.append(identifier(field.getName())).append(" ").append(fieldType);
+
+        if (field.getDefaultValue() != null) {
+            sql.append(" DEFAULT " + 
quoteDefaultValue(field.getDefaultValue()));
+        }
+        sql.append(" COMMENT 
'").append(quoteComment(field.getComment())).append("',");
+    }
+
+    public static String quoteDefaultValue(String defaultValue) {
+        // DEFAULT current_timestamp not need quote
+        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+            return defaultValue;
+        }
+        return "'" + defaultValue + "'";
     }
 
     public static String quoteComment(String comment) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 580b990..4c29c34 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -110,8 +110,8 @@ public class SchemaChangeHelper {
                         DorisSystem.quoteTableIdentifier(tableIdentifier),
                         DorisSystem.identifier(name),
                         type);
-        if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
-            addDDL = addDDL + " DEFAULT " + defaultValue;
+        if (defaultValue != null) {
+            addDDL = addDDL + " DEFAULT " + 
DorisSystem.quoteDefaultValue(defaultValue);
         }
         if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
             addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) 
+ "'";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index b3a90e6..08e8f05 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -369,17 +369,11 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
             return null;
         }
-        // Due to historical reasons, doris needs to add quotes to
-        // the default value of the new column
-        // For example in mysql: alter table add column c1 int default 100
-        // In Doris: alter table add column c1 int default '100'
-        if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
-            return defaultValue;
-        } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+        if (defaultValue.equals("1970-01-01 00:00:00")) {
             // TODO: The default value of setting the current time in CDC is 
1970-01-01 00:00:00
             return "current_timestamp";
         }
-        return "'" + defaultValue + "'";
+        return defaultValue;
     }
 
     @VisibleForTesting
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 1ad09d7..c525202 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -59,17 +59,19 @@ public abstract class SourceSchema {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String comment = rs.getString("REMARKS");
                 String fieldType = rs.getString("TYPE_NAME");
+                String defaultValue = rs.getString("COLUMN_DEF");
                 Integer precision = rs.getInt("COLUMN_SIZE");
-
                 if (rs.wasNull()) {
                     precision = null;
                 }
+
                 Integer scale = rs.getInt("DECIMAL_DIGITS");
                 if (rs.wasNull()) {
                     scale = null;
                 }
                 String dorisTypeStr = convertToDorisType(fieldType, precision, 
scale);
-                fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, 
comment));
+                fields.put(
+                        fieldName, new FieldSchema(fieldName, dorisTypeStr, 
defaultValue, comment));
             }
         }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index b577832..17a71f8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -33,15 +33,15 @@ public class SchemaChangeHelperTest {
 
     @Before
     public void setUp() {
-        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
+        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
null, ""));
 
-        updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
-        updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
-        updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
-        updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "", 
""));
-        updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", 
"", ""));
+        updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", null, ""));
+        updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", null, ""));
+        updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
null, ""));
+        updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", null, 
""));
+        updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", 
null, ""));
     }
 
     @Test
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 977f8da..5b553fc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -113,6 +113,18 @@ public class SchemaManagerTest {
         Assert.assertEquals(
                 "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 
'comment \"\\'sdf\\''",
                 addColumnDDL);
+
+        field = new FieldSchema("col", "int", "10", "comment \"'sdf'");
+        addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", 
field);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT 
'10' COMMENT 'comment \"\\'sdf\\''",
+                addColumnDDL);
+
+        field = new FieldSchema("col", "int", "current_timestamp", "comment 
\"'sdf'");
+        addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", 
field);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT 
current_timestamp COMMENT 'comment \"\\'sdf\\''",
+                addColumnDDL);
     }
 
     @Test
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 0ce60d3..11df3e0 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -154,7 +154,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", 
null, null));
         srcFiledSchemaMap.put(
                 "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", 
null, null));
-        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", 
null));
+        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "100", null));
 
         schemaChange.setSourceConnector("mysql");
         String columnsString =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to