This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 926c381024 [cdc] Support computed column referring to each other while 
sync_table (#5972)
926c381024 is described below

commit 926c381024260c0f5bbcbf3d696d0362f661d23b
Author: JackeyLee007 <jackeylee...@126.com>
AuthorDate: Tue Aug 5 10:01:44 2025 +0800

    [cdc] Support computed column referring to each other while sync_table 
(#5972)
---
 .../paimon/flink/action/cdc/ComputedColumn.java    |  14 --
 .../flink/action/cdc/ComputedColumnUtils.java      | 145 ++++++-------------
 .../apache/paimon/flink/action/cdc/Expression.java | 154 ++++-----------------
 .../action/cdc/format/AbstractRecordParser.java    |  14 +-
 .../flink/action/cdc/mysql/MySqlRecordParser.java  |  35 ++++-
 .../action/cdc/postgres/PostgresRecordParser.java  |  17 ++-
 .../paimon/flink/action/cdc/utils/DfsSort.java     | 103 ++++++++++++++
 .../apache/paimon/flink/sink/cdc/CdcSchema.java    |   5 +-
 .../flink/action/cdc/ComputedColumnUtilsTest.java  |  41 +++---
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  64 +++++++++
 .../flink/action/cdc/utils/DfsSortTestTest.java    |  55 ++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  |   9 ++
 12 files changed, 366 insertions(+), 290 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index dddb909cd7..5e9041a120 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -53,11 +53,6 @@ public class ComputedColumn implements Serializable {
         return expression.fieldReference();
     }
 
-    @Nullable
-    public DataType fieldReferenceType() {
-        return expression.fieldReferenceType();
-    }
-
     /** Compute column's value from given argument. Return null if input is 
null. */
     @Nullable
     public String eval(@Nullable String input) {
@@ -66,13 +61,4 @@ public class ComputedColumn implements Serializable {
         }
         return expression.eval(input);
     }
-
-    /** Compute column's value from given argument. Return null if input is 
null. */
-    @Nullable
-    public String eval(@Nullable String input, DataType inputType) {
-        if (fieldReference() != null && input == null) {
-            return null;
-        }
-        return expression.eval(input, inputType);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 28fbef9455..7bd9dd561d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -18,22 +18,19 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.flink.action.cdc.utils.DfsSort;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** Utility methods for {@link ComputedColumn}, such as build. */
 public class ComputedColumnUtils {
 
@@ -50,16 +47,44 @@ public class ComputedColumnUtils {
                         .collect(
                                 Collectors.toMap(DataField::name, 
DataField::type, (v1, v2) -> v2));
 
+        // sort computed column args by dependencies
+        LinkedHashMap<String, Tuple2<String, String[]>> sortedArgs =
+                sortComputedColumnArgs(computedColumnArgs, caseSensitive);
+
         List<ComputedColumn> computedColumns = new ArrayList<>();
-        for (String columnArg : computedColumnArgs) {
-            String[] kv = columnArg.split("=");
+        for (Map.Entry<String, Tuple2<String, String[]>> columnArg : 
sortedArgs.entrySet()) {
+            String columnName = columnArg.getKey().trim();
+            String exprName = columnArg.getValue().f0.trim();
+            String[] args = columnArg.getValue().f1;
+
+            Expression expr = Expression.create(typeMapping, caseSensitive, 
exprName, args);
+            ComputedColumn cmpColumn = new ComputedColumn(columnName, expr);
+            computedColumns.add(new ComputedColumn(columnName, expr));
+
+            // remember the column type for later reference by other computed 
columns
+            typeMapping.put(columnName, cmpColumn.columnType());
+        }
+
+        return computedColumns;
+    }
+
+    private static LinkedHashMap<String, Tuple2<String, String[]>> 
sortComputedColumnArgs(
+            List<String> computedColumnArgs, boolean caseSensitive) {
+        List<String> argList =
+                computedColumnArgs.stream()
+                        .map(x -> caseSensitive ? x : x.toUpperCase())
+                        .collect(Collectors.toList());
+
+        LinkedHashMap<String, Tuple2<String, String[]>> eqMap = new 
LinkedHashMap<>();
+        LinkedHashMap<String, String> refMap = new LinkedHashMap<>();
+        for (String arg : argList) {
+            String[] kv = arg.split("=");
             if (kv.length != 2) {
                 throw new IllegalArgumentException(
                         String.format(
                                 "Invalid computed column argument: %s. Please 
use format 'column-name=expr-name(args, ...)'.",
-                                columnArg));
+                                arg));
             }
-            String columnName = kv[0].trim();
             String expression = kv[1].trim();
             // parse expression
             int left = expression.indexOf('(');
@@ -69,101 +94,21 @@ public class ComputedColumnUtils {
                     String.format(
                             "Invalid expression: %s. Please use format 
'expr-name(args, ...)'.",
                             expression));
-
             String exprName = expression.substring(0, left);
             String[] args = expression.substring(left + 1, right).split(",");
-            checkArgument(args.length >= 1, "Computed column needs at least 
one argument.");
-
-            computedColumns.add(
-                    new ComputedColumn(
-                            columnName,
-                            Expression.create(typeMapping, caseSensitive, 
exprName, args)));
-        }
-
-        return sortComputedColumns(computedColumns);
-    }
-
-    @VisibleForTesting
-    public static List<ComputedColumn> 
sortComputedColumns(List<ComputedColumn> columns) {
-        Set<String> columnNames = new HashSet<>();
-        for (ComputedColumn col : columns) {
-            columnNames.add(col.columnName());
-        }
 
-        // For simple processing, no reference or referring to another 
computed column, means
-        // independent
-        List<ComputedColumn> independent = new ArrayList<>();
-        List<ComputedColumn> dependent = new ArrayList<>();
-
-        for (ComputedColumn col : columns) {
-            if (col.fieldReference() == null || 
!columnNames.contains(col.fieldReference())) {
-                independent.add(col);
-            } else {
-                dependent.add(col);
-            }
-        }
-
-        // Sort dependent columns with topological sort
-        Map<String, ComputedColumn> columnMap = new HashMap<>();
-        Map<String, Set<String>> reverseDependencies = new HashMap<>();
-
-        for (ComputedColumn col : dependent) {
-            columnMap.put(col.columnName(), col);
-            reverseDependencies
-                    .computeIfAbsent(col.fieldReference(), k -> new 
HashSet<>())
-                    .add(col.columnName());
-        }
-
-        List<ComputedColumn> sortedDependent = new ArrayList<>();
-        Set<String> visited = new HashSet<>();
-        Set<String> tempMark = new HashSet<>(); // For cycle detection
-
-        for (ComputedColumn col : dependent) {
-            if (!visited.contains(col.columnName())) {
-                dfs(
-                        col.columnName(),
-                        reverseDependencies,
-                        columnMap,
-                        sortedDependent,
-                        visited,
-                        tempMark);
-            }
+            // args[0] may be empty string, eg. "cal_col=now()"
+            eqMap.put(kv[0].trim(), Tuple2.of(exprName, args));
+            refMap.put(kv[0].trim(), args[0].trim());
         }
 
-        Collections.reverse(sortedDependent);
+        List<String> sortedKeys = DfsSort.sortKeys(refMap);
 
-        // Independent should precede dependent
-        List<ComputedColumn> result = new ArrayList<>();
-        result.addAll(independent);
-        result.addAll(sortedDependent);
-
-        return result;
-    }
-
-    private static void dfs(
-            String node,
-            Map<String, Set<String>> reverseDependencies,
-            Map<String, ComputedColumn> columnMap,
-            List<ComputedColumn> sorted,
-            Set<String> visited,
-            Set<String> tempMark) {
-        if (tempMark.contains(node)) {
-            throw new IllegalArgumentException("Cycle detected: " + node);
-        }
-        if (visited.contains(node)) {
-            return;
+        LinkedHashMap<String, Tuple2<String, String[]>> sortedMap =
+                new LinkedHashMap<>(refMap.size());
+        for (String key : sortedKeys) {
+            sortedMap.put(key, eqMap.get(key));
         }
-
-        tempMark.add(node);
-        ComputedColumn current = columnMap.get(node);
-
-        // Process the dependencies
-        for (String dependent : reverseDependencies.getOrDefault(node, 
Collections.emptySet())) {
-            dfs(dependent, reverseDependencies, columnMap, sorted, visited, 
tempMark);
-        }
-
-        tempMark.remove(node);
-        visited.add(node);
-        sorted.add(current);
+        return sortedMap;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 9fdc4606a0..087fe15e67 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -24,7 +24,6 @@ import org.apache.paimon.types.DataTypeFamily;
 import org.apache.paimon.types.DataTypeJsonParser;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SerializableSupplier;
 import org.apache.paimon.utils.StringUtils;
@@ -51,18 +50,12 @@ public interface Expression extends Serializable {
     /** Return name of referenced field. */
     String fieldReference();
 
-    /** Return {@link DataType} of referenced field. */
-    DataType fieldReferenceType();
-
     /** Return {@link DataType} of computed value. */
     DataType outputType();
 
     /** Compute value from given input. Input and output are serialized to 
string. */
     String eval(String input);
 
-    /** Compute value from given input. Input and output are serialized to 
string. */
-    String eval(String input, DataType inputType);
-
     /** Return name of this expression. */
     default String name() {
         return null;
@@ -73,7 +66,7 @@ public interface Expression extends Serializable {
         YEAR(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -83,7 +76,7 @@ public interface Expression extends Serializable {
         MONTH(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -93,7 +86,7 @@ public interface Expression extends Serializable {
         DAY(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -103,7 +96,7 @@ public interface Expression extends Serializable {
         HOUR(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -113,7 +106,7 @@ public interface Expression extends Serializable {
         MINUTE(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -123,7 +116,7 @@ public interface Expression extends Serializable {
         SECOND(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -133,7 +126,7 @@ public interface Expression extends Serializable {
         DATE_FORMAT(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return DateFormat.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -142,13 +135,13 @@ public interface Expression extends Serializable {
         SUBSTRING(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return substring(referencedField.field(), 
referencedField.literals());
                 }),
         TRUNCATE(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return truncate(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -159,7 +152,7 @@ public interface Expression extends Serializable {
         UPPER(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return new UpperExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -168,7 +161,7 @@ public interface Expression extends Serializable {
         LOWER(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return new LowerExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -177,7 +170,7 @@ public interface Expression extends Serializable {
         TRIM(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.create(typeMapping, caseSensitive, 
args);
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
                     return new TrimExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -215,16 +208,16 @@ public interface Expression extends Serializable {
     /** Referenced field in expression input parameters. */
     class ReferencedField {
         private final String field;
-        @Nullable private final DataType fieldType;
+        private final DataType fieldType;
         private final String[] literals;
 
-        private ReferencedField(String field, @Nullable DataType fieldType, 
String[] literals) {
+        private ReferencedField(String field, DataType fieldType, String[] 
literals) {
             this.field = field;
             this.fieldType = fieldType;
             this.literals = literals;
         }
 
-        public static ReferencedField create(
+        public static ReferencedField checkArgument(
                 Map<String, DataType> typeMapping, boolean caseSensitive, 
String... args) {
             String referencedField = args[0].trim();
             String[] literals =
@@ -233,13 +226,11 @@ public interface Expression extends Serializable {
                     StringUtils.toLowerCaseIfNeed(referencedField, 
caseSensitive);
 
             DataType fieldType =
-                    typeMapping.isEmpty()
-                            ? null
-                            : checkNotNull(
-                                    typeMapping.get(referencedFieldCheckForm),
-                                    String.format(
-                                            "Referenced field '%s' is not in 
given fields: %s.",
-                                            referencedFieldCheckForm, 
typeMapping.keySet()));
+                    checkNotNull(
+                            typeMapping.get(referencedFieldCheckForm),
+                            String.format(
+                                    "Referenced field '%s' is not in given 
fields: %s.",
+                                    referencedFieldCheckForm, 
typeMapping.keySet()));
             return new ReferencedField(referencedField, fieldType, literals);
         }
 
@@ -335,22 +326,16 @@ public interface Expression extends Serializable {
         private static final List<Integer> SUPPORTED_PRECISION = 
Arrays.asList(0, 3, 6, 9);
 
         private final String fieldReference;
-        @Nullable private DataType fieldReferenceType;
-        @Nullable private Integer precision;
+        @Nullable private final Integer precision;
 
         private transient Function<LocalDateTime, T> converter;
 
         private TemporalExpressionBase(
-                String fieldReference, @Nullable DataType fieldType, @Nullable 
Integer precision) {
+                String fieldReference, DataType fieldType, @Nullable Integer 
precision) {
             this.fieldReference = fieldReference;
-            this.fieldReferenceType = fieldType;
 
             // when the input is INTEGER_NUMERIC, the precision must be set
-            if (fieldType != null
-                    && fieldType
-                            .getTypeRoot()
-                            .getFamilies()
-                            .contains(DataTypeFamily.INTEGER_NUMERIC)
+            if 
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
                     && precision == null) {
                 precision = 0;
             }
@@ -369,11 +354,6 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
-        @Override
-        public DataType fieldReferenceType() {
-            return fieldReferenceType;
-        }
-
         /** If not, this must be overridden! */
         @Override
         public DataType outputType() {
@@ -390,21 +370,6 @@ public interface Expression extends Serializable {
             return String.valueOf(result);
         }
 
-        @Override
-        public String eval(String input, DataType inputType) {
-            if (this.fieldReferenceType == null) {
-                this.fieldReferenceType = inputType;
-
-                // when the input is INTEGER_NUMERIC, the precision must be set
-                if 
(inputType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
-                        && precision == null) {
-                    this.precision = 0;
-                }
-            }
-
-            return eval(input);
-        }
-
         private LocalDateTime toLocalDateTime(String input) {
             if (precision == null) {
                 return DateTimeUtils.toLocalDateTime(input, 9);
@@ -460,7 +425,7 @@ public interface Expression extends Serializable {
 
         private static TemporalToIntConverter create(
                 String fieldReference,
-                @Nullable DataType fieldType,
+                DataType fieldType,
                 SerializableSupplier<Function<LocalDateTime, Integer>> 
converterSupplier,
                 String... literals) {
             checkArgument(
@@ -539,11 +504,6 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
-        @Override
-        public DataType fieldReferenceType() {
-            return new VarCharType();
-        }
-
         @Override
         public DataType outputType() {
             return DataTypes.STRING();
@@ -564,11 +524,6 @@ public interface Expression extends Serializable {
                                 input, beginInclusive, endExclusive));
             }
         }
-
-        @Override
-        public String eval(String input, DataType inputType) {
-            return eval(input);
-        }
     }
 
     /** Truncate numeric/decimal/string value. */
@@ -577,11 +532,11 @@ public interface Expression extends Serializable {
 
         private final String fieldReference;
 
-        @Nullable private DataType fieldType;
+        private final DataType fieldType;
 
         private final int width;
 
-        TruncateComputer(String fieldReference, @Nullable DataType fieldType, 
String literal) {
+        TruncateComputer(String fieldReference, DataType fieldType, String 
literal) {
             this.fieldReference = fieldReference;
             this.fieldType = fieldType;
             try {
@@ -599,11 +554,6 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
-        @Override
-        public DataType fieldReferenceType() {
-            return fieldType;
-        }
-
         @Override
         public DataType outputType() {
             return fieldType;
@@ -638,14 +588,6 @@ public interface Expression extends Serializable {
             }
         }
 
-        @Override
-        public String eval(String input, DataType inputType) {
-            if (this.fieldType == null) {
-                this.fieldType = inputType;
-            }
-            return eval(input);
-        }
-
         private short truncateShort(int width, short value) {
             return (short) (value - (((value % width) + width) % width));
         }
@@ -690,11 +632,6 @@ public interface Expression extends Serializable {
             return null;
         }
 
-        @Override
-        public DataType fieldReferenceType() {
-            return null;
-        }
-
         @Override
         public DataType outputType() {
             return dataType;
@@ -704,11 +641,6 @@ public interface Expression extends Serializable {
         public String eval(String input) {
             return value;
         }
-
-        @Override
-        public String eval(String input, DataType inputType) {
-            return value;
-        }
     }
 
     /** Get current timestamp. */
@@ -718,11 +650,6 @@ public interface Expression extends Serializable {
             return null;
         }
 
-        @Override
-        public DataType fieldReferenceType() {
-            return null;
-        }
-
         @Override
         public DataType outputType() {
             return DataTypes.TIMESTAMP(3);
@@ -732,11 +659,6 @@ public interface Expression extends Serializable {
         public String eval(String input) {
             return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
         }
-
-        @Override
-        public String eval(String input, DataType inputType) {
-            return eval(input);
-        }
     }
 
     /** Convert string to upper case. */
@@ -797,14 +719,12 @@ public interface Expression extends Serializable {
     abstract class NoLiteralsStringExpressionBase implements Expression {
 
         private final String fieldReference;
-        @Nullable protected DataType fieldReferenceType;
 
         public NoLiteralsStringExpressionBase(
-                String fieldReference, @Nullable DataType fieldType, String... 
literals) {
+                String fieldReference, DataType fieldType, String... literals) 
{
             this.fieldReference = fieldReference;
-            this.fieldReferenceType = fieldType;
             checkArgument(
-                    fieldType == null || fieldType.getTypeRoot() == 
DataTypeRoot.VARCHAR,
+                    fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
                     String.format(
                             "'%s' expression only supports type root of '%s', 
but found '%s'.",
                             name(), DataTypeRoot.VARCHAR, 
fieldType.getTypeRoot()));
@@ -824,23 +744,5 @@ public interface Expression extends Serializable {
         public String fieldReference() {
             return fieldReference;
         }
-
-        @Override
-        public DataType fieldReferenceType() {
-            return fieldReferenceType;
-        }
-
-        @Override
-        public String eval(String input, DataType inputType) {
-            if (this.fieldReferenceType == null) {
-                checkArgument(
-                        inputType.getTypeRoot() == DataTypeRoot.VARCHAR,
-                        String.format(
-                                "'%s' expression only supports type root of 
'%s', but found '%s'.",
-                                name(), DataTypeRoot.VARCHAR, 
inputType.getTypeRoot()));
-                this.fieldReferenceType = inputType;
-            }
-            return eval(input);
-        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 8008cd7ca9..85442067b9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -104,17 +103,8 @@ public abstract class AbstractRecordParser
             Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
         computedColumns.forEach(
                 computedColumn -> {
-                    String result;
-                    if (computedColumn.fieldReference() != null
-                            && computedColumn.fieldReferenceType() == null) {
-                        DataType inputType =
-                                
schemaBuilder.getFieldType(computedColumn.fieldReference());
-                        result =
-                                computedColumn.eval(
-                                        
rowData.get(computedColumn.fieldReference()), inputType);
-                    } else {
-                        result = 
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
-                    }
+                    String result =
+                            
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
 
                     rowData.put(computedColumn.columnName(), result);
                     schemaBuilder.column(computedColumn.columnName(), 
computedColumn.columnType());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 0fffdf0b98..6c8f2ae324 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -235,21 +236,40 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
         Map<String, DebeziumEvent.Field> fields = 
schema.beforeAndAfterFields();
 
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+
         LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+
         for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) 
{
             String fieldName = field.getKey();
-            String mySqlType = field.getValue().type();
+            String debeziumType = field.getValue().type();
+            String className = field.getValue().name();
+
+            // record the field data type for computed columns reference
+            JsonNode parametersNode = field.getValue().parameters();
+            Map<String, String> parametersMap =
+                    isNull(parametersNode)
+                            ? Collections.emptyMap()
+                            : JsonSerdeUtil.convertValue(
+                                    parametersNode,
+                                    new TypeReference<HashMap<String, 
String>>() {});
+
+            DataType paimonDataType =
+                    DebeziumSchemaUtils.toDataType(debeziumType, className, 
parametersMap);
+            schemaBuilder.column(fieldName, paimonDataType);
+
             JsonNode objectValue = recordRow.get(fieldName);
             if (isNull(objectValue)) {
                 continue;
             }
 
-            String className = field.getValue().name();
             String oldValue = objectValue.asText();
             String newValue =
                     DebeziumSchemaUtils.transformRawValue(
                             oldValue,
-                            mySqlType,
+                            debeziumType,
                             className,
                             typeMapping,
                             objectValue,
@@ -259,9 +279,12 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
         // generate values of computed columns
         for (ComputedColumn computedColumn : computedColumns) {
-            resultMap.put(
-                    computedColumn.columnName(),
-                    
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+            String refName = computedColumn.fieldReference();
+
+            resultMap.put(computedColumn.columnName(), 
computedColumn.eval(resultMap.get(refName)));
+
+            // remember the computed column data type for later reference by 
other computed columns
+            schemaBuilder.column(computedColumn.columnName(), 
computedColumn.columnType());
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index af114cb920..c2565c1f2d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -121,7 +121,7 @@ public class PostgresRecordParser
         extractRecords().forEach(out::collect);
     }
 
-    private CdcSchema extractSchema(DebeziumEvent.Field schema) {
+    private CdcSchema extractSchema(DebeziumEvent.Field schema, 
CdcSchema.Builder schemaBuilder) {
         Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
         Preconditions.checkArgument(
                 !afterFields.isEmpty(),
@@ -129,7 +129,6 @@ public class PostgresRecordParser
                         + "Please make sure that `includeSchema` is true "
                         + "in the JsonDebeziumDeserializationSchema you 
created");
 
-        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
         afterFields.forEach(
                 (key, value) -> {
                     DataType dataType = extractFieldType(value);
@@ -207,15 +206,16 @@ public class PostgresRecordParser
 
     private List<RichCdcMultiplexRecord> extractRecords() {
         List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+        CdcSchema schema = extractSchema(root.schema(), schemaBuilder);
 
-        Map<String, String> before = extractRow(root.payload().before());
+        Map<String, String> before = extractRow(root.payload().before(), 
schemaBuilder);
         if (!before.isEmpty()) {
             records.add(createRecord(RowKind.DELETE, before));
         }
 
-        Map<String, String> after = extractRow(root.payload().after());
+        Map<String, String> after = extractRow(root.payload().after(), 
schemaBuilder);
         if (!after.isEmpty()) {
-            CdcSchema schema = extractSchema(root.schema());
             records.add(
                     new RichCdcMultiplexRecord(
                             databaseName,
@@ -227,7 +227,7 @@ public class PostgresRecordParser
         return records;
     }
 
-    private Map<String, String> extractRow(JsonNode recordRow) {
+    private Map<String, String> extractRow(JsonNode recordRow, 
CdcSchema.Builder schemaBuilder) {
         if (isNull(recordRow)) {
             return new HashMap<>();
         }
@@ -346,9 +346,8 @@ public class PostgresRecordParser
 
         // generate values of computed columns
         for (ComputedColumn computedColumn : computedColumns) {
-            resultMap.put(
-                    computedColumn.columnName(),
-                    
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+            String refName = computedColumn.fieldReference();
+            resultMap.put(computedColumn.columnName(), 
computedColumn.eval(resultMap.get(refName)));
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
new file mode 100644
index 0000000000..555c64ee2a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** DFS sort algorithm for topological sorting of a DAG (Directed Acyclic 
Graph). */
+public class DfsSort {
+    public static <K> LinkedHashMap<K, K> sort(LinkedHashMap<K, K> depMap) {
+        List<K> sortedKeys = sortKeys(depMap);
+        LinkedHashMap<K, K> sortedMap = new LinkedHashMap<>();
+        for (K key : sortedKeys) {
+            sortedMap.put(key, depMap.get(key));
+        }
+        return sortedMap;
+    }
+
+    public static <K> List<K> sortKeys(LinkedHashMap<K, K> depMap) {
+
+        Map<K, Set<K>> revMap = new LinkedHashMap<>();
+
+        List<K> noDeps = new ArrayList<>();
+
+        for (Map.Entry<K, K> entry : depMap.entrySet()) {
+            K key = entry.getKey();
+            K val = entry.getValue();
+
+            if (val == null || !depMap.containsKey(val)) {
+                noDeps.add(key);
+            } else {
+                revMap.computeIfAbsent(val, k -> new HashSet<>()).add(key);
+            }
+        }
+
+        List<K> sorted = new ArrayList<>();
+
+        Set<K> visited = new HashSet<>();
+        Set<K> tempMark = new HashSet<>(); // for cycle reference detection
+
+        for (Map.Entry<K, K> entry : depMap.entrySet()) {
+            K key = entry.getKey();
+            K val = entry.getValue();
+            if (val == null || !depMap.containsKey(val)) {
+                continue;
+            }
+
+            if (!visited.contains(key)) {
+                dfs(key, revMap, sorted, visited, tempMark);
+            }
+        }
+
+        Collections.reverse(noDeps);
+
+        sorted.addAll(noDeps);
+
+        Collections.reverse(sorted);
+
+        return sorted;
+    }
+
+    private static <K> void dfs(
+            K node, Map<K, Set<K>> revMap, List<K> sorted, Set<K> visited, 
Set<K> tempMark) {
+        if (tempMark.contains(node)) {
+            throw new IllegalArgumentException("Cycle detected: " + node);
+        }
+        if (visited.contains(node)) {
+            return;
+        }
+
+        tempMark.add(node);
+
+        // Process the dependencies
+        for (K dependent : revMap.getOrDefault(node, Collections.emptySet())) {
+            dfs(dependent, revMap, sorted, visited, tempMark);
+        }
+
+        tempMark.remove(node);
+        visited.add(node);
+        sorted.add(node);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
index 7a615fdbc8..229dcc69bb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -184,10 +184,7 @@ public class CdcSchema implements Serializable {
         /** Returns the data type of the specified field. */
         public DataType getFieldType(String fieldName) {
             DataField field = columns.get(fieldName);
-            if (field == null) {
-                throw new IllegalArgumentException("Field " + fieldName + " 
not found in schema.");
-            }
-            return field.type();
+            return field == null ? null : field.type();
         }
 
         /** Returns an instance of an unresolved {@link CdcSchema}. */
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
index 5ab3f48817..d04f91053b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
@@ -18,44 +18,47 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.sortComputedColumns;
+import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
 import static org.junit.Assert.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test for ComputedColumnUtils. */
 public class ComputedColumnUtilsTest {
+
     @Test
-    public void testSortComputedColumns() {
-        List<ComputedColumn> columns =
+    public void testComputedColumns() {
+        List<String> calColArgs =
                 Arrays.asList(
-                        new ComputedColumn("A", Expression.substring("B", 
"1")),
-                        new ComputedColumn("B", 
Expression.substring("ExistedColumn", "1")),
-                        new ComputedColumn("C", Expression.cast("No 
Reference")),
-                        new ComputedColumn("D", Expression.substring("A", 
"1")),
-                        new ComputedColumn("E", Expression.substring("C", 
"1")));
+                        "A=substring(B, 1)",
+                        "B=substring(ExistedColumn,1)",
+                        "C=now()",
+                        "D=substring(A, 1)",
+                        "E=substring(C,1)");
+        List<DataField> physicalFields =
+                Arrays.asList(new DataField(1, "ExistedColumn", 
DataTypes.STRING()));
+        List<ComputedColumn> columns = buildComputedColumns(calColArgs, 
physicalFields);
 
-        List<ComputedColumn> sortedColumns = sortComputedColumns(columns);
         assertEquals(
                 Arrays.asList("B", "C", "E", "A", "D"),
-                sortedColumns.stream()
-                        .map(ComputedColumn::columnName)
-                        .collect(Collectors.toList()));
+                
columns.stream().map(ComputedColumn::columnName).collect(Collectors.toList()));
     }
 
     @Test
     public void testCycleReference() {
-        List<ComputedColumn> columns =
-                Arrays.asList(
-                        new ComputedColumn("A", Expression.substring("B", 
"1")),
-                        new ComputedColumn("B", Expression.substring("C", 
"1")),
-                        new ComputedColumn("C", Expression.substring("A", 
"1")));
-
-        assertThrows(IllegalArgumentException.class, () -> 
sortComputedColumns(columns));
+        List<String> calColArgs =
+                Arrays.asList("A=substring(B, 1)", "B=substring(C, 1)", 
"C=substring(A, 1)");
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> buildComputedColumns(calColArgs, 
Collections.emptyList()));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index b48a1c79cf..1200629351 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1131,6 +1131,70 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         epochSecond * 1_000_000_000 + nano));
     }
 
+    @Test
+    @Timeout(60)
+    public void testComputedColumnsCrossReference() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_computed_column2");
+
+        List<String> computedColumnDefs =
+                Arrays.asList(
+                        "_lower_of_upper=lower(_upper)",
+                        "_upper=upper(_value)",
+                        "_trim_lower=trim(_lower_of_upper)",
+                        "_constant=cast(11,INT)");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withPrimaryKeys("pk")
+                        .withComputedColumnArgs(computedColumnDefs)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            statement.execute("USE " + DATABASE_NAME);
+            statement.executeUpdate(
+                    "INSERT INTO test_computed_column2 VALUES (1, 
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10', ' vaLUE ')");
+            statement.executeUpdate(
+                    "INSERT INTO test_computed_column2 VALUES (2, 
'2023-03-23', null, null, null)");
+        }
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.DATE(),
+                            DataTypes.TIMESTAMP(0),
+                            DataTypes.TIMESTAMP(0),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {
+                            "pk",
+                            "_date",
+                            "_datetime",
+                            "_timestamp",
+                            "_value",
+                            "_upper",
+                            "_constant",
+                            "_lower_of_upper",
+                            "_trim_lower"
+                        });
+        List<String> expected =
+                Arrays.asList(
+                        // sort according to reference
+
+                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,  
vaLUE ,  VALUE , 11,  value , value]",
+                        "+I[2, 19439, NULL, NULL, NULL, NULL, 11, NULL, 
NULL]");
+
+        waitForResult(expected, table, rowType, Arrays.asList("pk"));
+    }
+
     @Test
     @Timeout(60)
     public void testSyncShards() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
new file mode 100644
index 0000000000..180474d2cf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link DfsSort}. */
+public class DfsSortTestTest {
+    @Test
+    public void testSortKeys() {
+        LinkedHashMap<String, String> refs = new LinkedHashMap<>();
+        refs.put("A", "B");
+        refs.put("B", "O");
+        refs.put("C", null);
+        refs.put("D", "A");
+        refs.put("E", "C");
+        refs.put("F", "");
+
+        List<String> sorted = DfsSort.sortKeys(refs);
+        assertEquals(Arrays.asList("B", "C", "F", "E", "A", "D"), sorted);
+    }
+
+    @Test
+    public void testCycleReference() {
+        LinkedHashMap<String, String> refs = new LinkedHashMap<>();
+        refs.put("A", "B");
+        refs.put("B", "C");
+        refs.put("C", "A");
+
+        assertThrows(IllegalArgumentException.class, () -> DfsSort.sort(refs));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index ae0186cf70..a3b42f9440 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -296,6 +296,15 @@ CREATE TABLE test_computed_column (
     PRIMARY KEY (pk)
 );
 
+CREATE TABLE test_computed_column2 (
+    pk INT,
+    _date DATE,
+    _datetime DATETIME,
+    _timestamp TIMESTAMP,
+    _value VARCHAR(10),
+    PRIMARY KEY (pk)
+);
+
 CREATE TABLE test_time_to_int_epoch (
     pk INT,
     _second_val0 INT,

Reply via email to