This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 4561a8a32baa46e2f2b27d42ad4a099ee62f32a7
Author: lvyanquan <[email protected]>
AuthorDate: Thu Dec 28 15:54:13 2023 +0800

    [FLINK-34638][cdc-common] Support column with default value
---
 .../org/apache/flink/cdc/common/schema/Column.java | 82 +++++++++++++++++++---
 .../flink/cdc/common/schema/PhysicalColumn.java    |  9 ++-
 .../org/apache/flink/cdc/common/schema/Schema.java | 15 ++++
 .../doris/sink/DorisMetadataApplier.java           |  7 +-
 .../parser/CustomAlterTableParserListener.java     |  5 +-
 .../source/reader/MySqlPipelineRecordEmitter.java  |  6 +-
 .../mysql/source/MySqlPipelineITCase.java          |  2 +-
 .../starrocks/sink/StarRocksMetadataApplier.java   |  7 +-
 .../connectors/starrocks/sink/StarRocksUtils.java  |  3 +-
 .../schema/coordinator/SchemaManager.java          |  7 +-
 .../schema/coordinator/SchemaRegistry.java         |  1 +
 .../operators/transform/TableChangeInfo.java       |  2 +-
 .../serializer/schema/ColumnSerializer.java        | 13 +++-
 .../schema/PhysicalColumnSerializer.java           | 36 ++++++++--
 .../serializer/schema/SchemaSerializer.java        |  6 +-
 .../schema/PhysicalColumnSerializerTest.java       |  3 +-
 16 files changed, 174 insertions(+), 30 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java
index 5c1c59d5e..910032c46 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java
@@ -38,9 +38,16 @@ public abstract class Column implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    protected static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
+    protected static final String 
FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION =
+            "%s %s '%s'";
 
-    protected static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
+    protected static final String 
FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
+            "%s %s '%s'";
+
+    protected static final String 
FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
+            "%s %s '%s' '%s'";
+
+    protected static final String 
FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = "%s %s";
 
     protected final String name;
 
@@ -48,10 +55,29 @@ public abstract class Column implements Serializable {
 
     protected final @Nullable String comment;
 
+    /**
+     * Save the literal value of the column's default value, For uncertain 
functions such as UUID(),
+     * the value is null, For the current time function such as 
CURRENT_TIMESTAMP(), the value is
+     * Unix Epoch time(1970-01-01 00:00:00).
+     */
+    protected final @Nullable String defaultValueExpression;
+
     protected Column(String name, DataType type, @Nullable String comment) {
         this.name = name;
         this.type = type;
         this.comment = comment;
+        this.defaultValueExpression = null;
+    }
+
+    protected Column(
+            String name,
+            DataType type,
+            @Nullable String comment,
+            @Nullable String defaultValueExpression) {
+        this.name = name;
+        this.type = type;
+        this.comment = comment;
+        this.defaultValueExpression = defaultValueExpression;
     }
 
     /** Returns the name of this column. */
@@ -69,17 +95,41 @@ public abstract class Column implements Serializable {
         return comment;
     }
 
+    @Nullable
+    public String getDefaultValueExpression() {
+        return defaultValueExpression;
+    }
+
     /** Returns a string that summarizes this column for printing to a 
console. */
     public String asSummaryString() {
         if (comment == null) {
-            return String.format(
-                    FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), 
type.asSummaryString());
+            if (defaultValueExpression == null) {
+                return String.format(
+                        
FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
+                        escapeIdentifier(name),
+                        type.asSummaryString());
+            } else {
+                return String.format(
+                        
FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
+                        escapeIdentifier(name),
+                        type.asSummaryString(),
+                        defaultValueExpression);
+            }
         } else {
-            return String.format(
-                    FIELD_FORMAT_WITH_DESCRIPTION,
-                    escapeIdentifier(name),
-                    type.asSummaryString(),
-                    escapeSingleQuotes(comment));
+            if (defaultValueExpression == null) {
+                return String.format(
+                        
FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
+                        escapeIdentifier(name),
+                        type.asSummaryString(),
+                        escapeSingleQuotes(comment));
+            } else {
+                return String.format(
+                        
FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
+                        escapeIdentifier(name),
+                        type.asSummaryString(),
+                        escapeSingleQuotes(comment),
+                        defaultValueExpression);
+            }
         }
     }
 
@@ -103,12 +153,13 @@ public abstract class Column implements Serializable {
         Column column = (Column) o;
         return name.equals(column.name)
                 && type.equals(column.type)
-                && Objects.equals(comment, column.comment);
+                && Objects.equals(comment, column.comment)
+                && Objects.equals(defaultValueExpression, 
column.defaultValueExpression);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, type, comment);
+        return Objects.hash(name, type, comment, defaultValueExpression);
     }
 
     @Override
@@ -116,6 +167,15 @@ public abstract class Column implements Serializable {
         return asSummaryString();
     }
 
+    /** Creates a physical column. */
+    public static PhysicalColumn physicalColumn(
+            String name,
+            DataType type,
+            @Nullable String comment,
+            @Nullable String defaultValueExpression) {
+        return new PhysicalColumn(name, type, comment, defaultValueExpression);
+    }
+
     /** Creates a physical column. */
     public static PhysicalColumn physicalColumn(
             String name, DataType type, @Nullable String comment) {
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java
index 1f3e26427..bf711f791 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java
@@ -32,6 +32,11 @@ public class PhysicalColumn extends Column {
         super(name, type, comment);
     }
 
+    public PhysicalColumn(
+            String name, DataType type, @Nullable String comment, @Nullable 
String defaultValue) {
+        super(name, type, comment, defaultValue);
+    }
+
     @Override
     public boolean isPhysical() {
         return true;
@@ -39,11 +44,11 @@ public class PhysicalColumn extends Column {
 
     @Override
     public Column copy(DataType newType) {
-        return new PhysicalColumn(name, newType, comment);
+        return new PhysicalColumn(name, newType, comment, 
defaultValueExpression);
     }
 
     @Override
     public Column copy(String newName) {
-        return new PhysicalColumn(newName, type, comment);
+        return new PhysicalColumn(newName, type, comment, 
defaultValueExpression);
     }
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index 62657be88..2db509e6e 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -316,6 +316,21 @@ public class Schema implements Serializable {
             return this;
         }
 
+        /**
+         * Declares a physical column that is appended to this schema.
+         *
+         * @param columnName column name
+         * @param type data type of the column
+         * @param comment description of the column
+         * @param defaultValue default value of the column
+         */
+        public Builder physicalColumn(
+                String columnName, DataType type, String comment, String 
defaultValue) {
+            checkColumn(columnName, type);
+            columns.add(Column.physicalColumn(columnName, type, comment, 
defaultValue));
+            return this;
+        }
+
         /**
          * Declares a metadata column that is appended to this schema.
          *
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index ad025c629..8ea6f6032 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -134,7 +134,11 @@ public class DorisMetadataApplier implements 
MetadataApplier {
             }
             fieldSchemaMap.put(
                     column.getName(),
-                    new FieldSchema(column.getName(), typeString, 
column.getComment()));
+                    new FieldSchema(
+                            column.getName(),
+                            typeString,
+                            column.getDefaultValueExpression(),
+                            column.getComment()));
         }
         return fieldSchemaMap;
     }
@@ -170,6 +174,7 @@ public class DorisMetadataApplier implements 
MetadataApplier {
                     new FieldSchema(
                             column.getName(),
                             buildTypeString(column.getType()),
+                            column.getDefaultValueExpression(),
                             column.getComment());
             schemaChangeManager.addColumn(
                     tableId.getSchemaName(), tableId.getTableName(), 
addFieldSchema);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 6f3fb9a8c..cde5aa0c5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -253,7 +253,10 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
 
     private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column 
dbzColumn) {
         return org.apache.flink.cdc.common.schema.Column.physicalColumn(
-                dbzColumn.name(), fromDbzColumn(dbzColumn), 
dbzColumn.comment());
+                dbzColumn.name(),
+                fromDbzColumn(dbzColumn),
+                dbzColumn.comment(),
+                dbzColumn.defaultValueExpression().orElse(null));
     }
 
     private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId 
dbzTableId) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index b7434bf94..3f7581347 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -205,7 +205,11 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
             if (!column.isOptional()) {
                 dataType = dataType.notNull();
             }
-            tableBuilder.physicalColumn(colName, dataType, column.comment());
+            tableBuilder.physicalColumn(
+                    colName,
+                    dataType,
+                    column.comment(),
+                    column.defaultValueExpression().orElse(null));
         }
 
         List<String> primaryKey = table.primaryKeyColumnNames();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 5eb6ce0e5..076e57364 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -355,7 +355,7 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                 tableId,
                 Schema.newBuilder()
                         .physicalColumn("id", DataTypes.INT().notNull())
-                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
                         .physicalColumn("description", DataTypes.VARCHAR(512))
                         .physicalColumn("weight", DataTypes.FLOAT())
                         .primaryKey(Collections.singletonList("id"))
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index 5ee93ff62..34b483d00 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;
+
 /** A {@code MetadataApplier} that applies metadata changes to StarRocks. */
 public class StarRocksMetadataApplier implements MetadataApplier {
 
@@ -117,8 +119,9 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
                     new StarRocksColumn.Builder()
                             .setColumnName(column.getName())
                             .setOrdinalPosition(-1)
-                            .setColumnComment(column.getComment());
-            StarRocksUtils.toStarRocksDataType(column, false, builder);
+                            .setColumnComment(column.getComment())
+                            
.setDefaultValue(column.getDefaultValueExpression());
+            toStarRocksDataType(column, false, builder);
             addColumns.add(builder.build());
         }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index ccab99c7a..50dab2ac4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -84,7 +84,8 @@ public class StarRocksUtils {
                     new StarRocksColumn.Builder()
                             .setColumnName(column.getName())
                             .setOrdinalPosition(i)
-                            .setColumnComment(column.getComment());
+                            .setColumnComment(column.getComment())
+                            
.setDefaultValue(column.getDefaultValueExpression());
             toStarRocksDataType(column, i < primaryKeyCount, builder);
             starRocksColumns.add(builder.build());
         }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
index 99335180d..30fc86a60 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
@@ -172,7 +172,11 @@ public class SchemaManager {
     /** Serializer for {@link SchemaManager}. */
     public static class Serializer implements 
SimpleVersionedSerializer<SchemaManager> {
 
-        public static final int CURRENT_VERSION = 1;
+        /**
+         * Update history: from Version 3.0.0, set to 0, from version 3.1.1, 
updated to 1, from
+         * version 3.2.0, updated to 2.
+         */
+        public static final int CURRENT_VERSION = 2;
 
         @Override
         public int getVersion() {
@@ -214,6 +218,7 @@ public class SchemaManager {
             switch (version) {
                 case 0:
                 case 1:
+                case 2:
                     TableIdSerializer tableIdSerializer = 
TableIdSerializer.INSTANCE;
                     SchemaSerializer schemaSerializer = 
SchemaSerializer.INSTANCE;
                     try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 02abb8903..8de38e140 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -214,6 +214,7 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
                         break;
                     }
                 case 1:
+                case 2:
                     {
                         int length = in.readInt();
                         byte[] serializedSchemaManager = new byte[length];
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
index 7fab9c65a..fa04fa50a 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
@@ -109,7 +109,7 @@ public class TableChangeInfo {
     /** Serializer for {@link TableChangeInfo}. */
     public static class Serializer implements 
SimpleVersionedSerializer<TableChangeInfo> {
 
-        public static final int CURRENT_VERSION = 1;
+        public static final int CURRENT_VERSION = 2;
 
         @Override
         public int getVersion() {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java
index d6c80eadd..c8e60386e 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java
@@ -46,6 +46,13 @@ public class ColumnSerializer extends 
TypeSerializerSingleton<Column> {
     private final MetadataColumnSerializer metadataColumnSerializer =
             MetadataColumnSerializer.INSTANCE;
 
+    private static int currentVersion = 2;
+
+    /** Update {@link #currentVersion} as We did not directly include this 
version in the file. */
+    public static void updateVersion(int version) {
+        currentVersion = version;
+    }
+
     @Override
     public boolean isImmutableType() {
         return false;
@@ -92,12 +99,16 @@ public class ColumnSerializer extends 
TypeSerializerSingleton<Column> {
 
     @Override
     public Column deserialize(DataInputView source) throws IOException {
+        return deserialize(currentVersion, source);
+    }
+
+    public Column deserialize(int version, DataInputView source) throws 
IOException {
         ColumnType columnType = enumSerializer.deserialize(source);
         switch (columnType) {
             case METADATA:
                 return metadataColumnSerializer.deserialize(source);
             case PHYSICAL:
-                return physicalColumnSerializer.deserialize(source);
+                return physicalColumnSerializer.deserialize(version, source);
             default:
                 throw new IOException("Unknown column type: " + columnType);
         }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java
index e163d39ad..b91c260f0 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java
@@ -42,6 +42,8 @@ public class PhysicalColumnSerializer extends 
TypeSerializerSingleton<PhysicalCo
     private final DataTypeSerializer dataTypeSerializer = new 
DataTypeSerializer();
     private final StringSerializer stringSerializer = 
StringSerializer.INSTANCE;
 
+    private static final int CURRENT_VERSION = 2;
+
     @Override
     public boolean isImmutableType() {
         return false;
@@ -57,7 +59,8 @@ public class PhysicalColumnSerializer extends 
TypeSerializerSingleton<PhysicalCo
         return Column.physicalColumn(
                 stringSerializer.copy(from.getName()),
                 dataTypeSerializer.copy(from.getType()),
-                stringSerializer.copy(from.getComment()));
+                stringSerializer.copy(from.getComment()),
+                stringSerializer.copy(from.getDefaultValueExpression()));
     }
 
     @Override
@@ -75,14 +78,37 @@ public class PhysicalColumnSerializer extends 
TypeSerializerSingleton<PhysicalCo
         stringSerializer.serialize(record.getName(), target);
         dataTypeSerializer.serialize(record.getType(), target);
         stringSerializer.serialize(record.getComment(), target);
+        stringSerializer.serialize(record.getDefaultValueExpression(), target);
     }
 
     @Override
     public PhysicalColumn deserialize(DataInputView source) throws IOException 
{
-        String name = stringSerializer.deserialize(source);
-        DataType dataType = dataTypeSerializer.deserialize(source);
-        String comment = stringSerializer.deserialize(source);
-        return Column.physicalColumn(name, dataType, comment);
+        return deserialize(CURRENT_VERSION, source);
+    }
+
+    public PhysicalColumn deserialize(int version, DataInputView source) 
throws IOException {
+        switch (version) {
+            case 0:
+            case 1:
+                {
+                    String name = stringSerializer.deserialize(source);
+                    DataType dataType = dataTypeSerializer.deserialize(source);
+                    String comment = stringSerializer.deserialize(source);
+                    return Column.physicalColumn(name, dataType, comment);
+                }
+            case 2:
+                {
+                    String name = stringSerializer.deserialize(source);
+                    DataType dataType = dataTypeSerializer.deserialize(source);
+                    String comment = stringSerializer.deserialize(source);
+                    String defaultValue = stringSerializer.deserialize(source);
+                    return Column.physicalColumn(name, dataType, comment, 
defaultValue);
+                }
+            default:
+                {
+                    throw new IOException("Unrecognized serialization version 
" + version);
+                }
+        }
     }
 
     @Override
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
index 8e2a4fa02..cdaf9e6cf 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
@@ -89,7 +89,7 @@ public class SchemaSerializer extends 
TypeSerializerSingleton<Schema> {
         stringSerializer.serialize(record.comment(), target);
     }
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 2;
 
     @Override
     public Schema deserialize(DataInputView source) throws IOException {
@@ -97,6 +97,9 @@ public class SchemaSerializer extends 
TypeSerializerSingleton<Schema> {
     }
 
     public Schema deserialize(int version, DataInputView source) throws 
IOException {
+        // Manually updating versions because column deserialization is 
wrapped by
+        // ListSerializer.
+        ColumnSerializer.updateVersion(version);
         switch (version) {
             case 0:
                 return Schema.newBuilder()
@@ -106,6 +109,7 @@ public class SchemaSerializer extends 
TypeSerializerSingleton<Schema> {
                         .comment(stringSerializer.deserialize(source))
                         .build();
             case 1:
+            case 2:
                 return Schema.newBuilder()
                         .setColumns(columnsSerializer.deserialize(source))
                         .primaryKey(primaryKeysSerializer.deserialize(source))
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializerTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializerTest.java
index 48efb536e..27477f9db 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializerTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializerTest.java
@@ -44,7 +44,8 @@ public class PhysicalColumnSerializerTest extends 
SerializerTestBase<PhysicalCol
     protected PhysicalColumn[] getTestData() {
         return new PhysicalColumn[] {
             Column.physicalColumn("col1", DataTypes.BIGINT()),
-            Column.physicalColumn("col1", DataTypes.BIGINT(), "comment")
+            Column.physicalColumn("col1", DataTypes.BIGINT(), "comment"),
+            Column.physicalColumn("col1", DataTypes.BIGINT(), "comment", 
"default value")
         };
     }
 }

Reply via email to