This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f512c08c [cdc] Calculated columns support constant value. (#3156)
6f512c08c is described below

commit 6f512c08c1d4a48caadbe3ca9720f268f63541f1
Author: Kerwin <[email protected]>
AuthorDate: Wed Apr 10 12:09:48 2024 +0800

    [cdc] Calculated columns support constant value. (#3156)
---
 .../shortcodes/generated/other_functions.html      |   4 +
 .../apache/paimon/types/DataTypeJsonParser.java    |   2 +-
 .../paimon/flink/action/cdc/ComputedColumn.java    |   3 +-
 .../flink/action/cdc/ComputedColumnUtils.java      |  19 +-
 .../apache/paimon/flink/action/cdc/Expression.java | 265 +++++++++++++++++----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  13 +-
 6 files changed, 237 insertions(+), 69 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/other_functions.html 
b/docs/layouts/shortcodes/generated/other_functions.html
index f367c97d0..2135fc817 100644
--- a/docs/layouts/shortcodes/generated/other_functions.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -39,5 +39,9 @@ under the License.
              If the column is an INT or LONG, truncate(column,width) will 
truncate the number with the algorithm `v - (((v % W) + W) % W)`. The 
`redundant` compute part is to keep the result always positive.
              If the column is a DECIMAL, truncate(column,width) will truncate 
the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then 
return `v - (v % scaled_W)`.</td>
     </tr>
+    <tr>
+        <td><h5>cast(value,dataType)</h5></td>
+        <td>Get a constant value. The output is an atomic type, such as 
STRING, INT, BOOLEAN, etc.</td>
+    </tr>
     </tbody>
 </table>
\ No newline at end of file
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java 
b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
index 0af68df9d..1afde1b2d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
@@ -76,7 +76,7 @@ public final class DataTypeJsonParser {
         throw new IllegalArgumentException("Can not parse: " + json);
     }
 
-    private static DataType parseAtomicTypeSQLString(String string) {
+    public static DataType parseAtomicTypeSQLString(String string) {
         List<Token> tokens = tokenize(string);
         TokenParser converter = new TokenParser(string, tokens);
         return converter.parseTokens();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index 3b262fafd..5e9041a12 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -48,6 +48,7 @@ public class ComputedColumn implements Serializable {
         return expression.outputType();
     }
 
+    @Nullable
     public String fieldReference() {
         return expression.fieldReference();
     }
@@ -55,7 +56,7 @@ public class ComputedColumn implements Serializable {
     /** Compute column's value from given argument. Return null if input is 
null. */
     @Nullable
     public String eval(@Nullable String input) {
-        if (input == null) {
+        if (fieldReference() != null && input == null) {
             return null;
         }
         return expression.eval(input);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index a18355ca6..24ca0599b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -21,10 +21,8 @@ package org.apache.paimon.flink.action.cdc;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -71,25 +69,10 @@ public class ComputedColumnUtils {
             String[] args = expression.substring(left + 1, right).split(",");
             checkArgument(args.length >= 1, "Computed column needs at least 
one argument.");
 
-            String fieldReference = args[0].trim();
-            String[] literals =
-                    
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
-            String fieldReferenceCheckForm =
-                    StringUtils.caseSensitiveConversion(fieldReference, 
caseSensitive);
-            checkArgument(
-                    typeMapping.containsKey(fieldReferenceCheckForm),
-                    String.format(
-                            "Referenced field '%s' is not in given fields: 
%s.",
-                            fieldReferenceCheckForm, typeMapping.keySet()));
-
             computedColumns.add(
                     new ComputedColumn(
                             columnName,
-                            Expression.create(
-                                    exprName,
-                                    fieldReference,
-                                    typeMapping.get(fieldReferenceCheckForm),
-                                    literals)));
+                            Expression.create(typeMapping, caseSensitive, 
exprName, args)));
         }
 
         return computedColumns;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 417758f4b..2e0a13192 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -21,9 +21,11 @@ package org.apache.paimon.flink.action.cdc;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeJsonParser;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -34,25 +36,16 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Produce a computation result for computed column. */
 public interface Expression extends Serializable {
 
-    List<String> SUPPORTED_EXPRESSION =
-            Arrays.asList(
-                    "year",
-                    "month",
-                    "day",
-                    "hour",
-                    "minute",
-                    "second",
-                    "date_format",
-                    "substring",
-                    "truncate");
-
     /** Return name of referenced field. */
     String fieldReference();
 
@@ -62,40 +55,179 @@ public interface Expression extends Serializable {
     /** Compute value from given input. Input and output are serialized to 
string. */
     String eval(String input);
 
+    /** Expression function. */
+    enum ExpressionFunction {
+        YEAR(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getYear,
+                            referencedField.literals());
+                }),
+        MONTH(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getMonthValue,
+                            referencedField.literals());
+                }),
+        DAY(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getDayOfMonth,
+                            referencedField.literals());
+                }),
+        HOUR(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getHour,
+                            referencedField.literals());
+                }),
+        MINUTE(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getMinute,
+                            referencedField.literals());
+                }),
+        SECOND(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return TemporalToIntConverter.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            () -> LocalDateTime::getSecond,
+                            referencedField.literals());
+                }),
+        DATE_FORMAT(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return DateFormat.create(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            referencedField.literals());
+                }),
+        SUBSTRING(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return substring(referencedField.field(), 
referencedField.literals());
+                }),
+        TRUNCATE(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return truncate(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            referencedField.literals());
+                }),
+        CAST((typeMapping, caseSensitive, args) -> cast(args));
+
+        public final ExpressionCreator creator;
+
+        ExpressionFunction(ExpressionCreator creator) {
+            this.creator = creator;
+        }
+
+        public ExpressionCreator getCreator() {
+            return creator;
+        }
+
+        private static final Map<String, ExpressionCreator> 
EXPRESSION_FUNCTIONS =
+                Arrays.stream(ExpressionFunction.values())
+                        .collect(
+                                Collectors.toMap(
+                                        value -> value.name().toLowerCase(),
+                                        ExpressionFunction::getCreator));
+
+        public static ExpressionCreator creator(String exprName) {
+            return EXPRESSION_FUNCTIONS.get(exprName.toLowerCase());
+        }
+    }
+
+    /** Expression creator. */
+    @FunctionalInterface
+    interface ExpressionCreator {
+        Expression create(Map<String, DataType> typeMapping, boolean 
caseSensitive, String[] args);
+    }
+
+    /** Referenced field in expression input parameters. */
+    class ReferencedField {
+        private final String field;
+        private final DataType fieldType;
+        private final String[] literals;
+
+        private ReferencedField(String field, DataType fieldType, String[] 
literals) {
+            this.field = field;
+            this.fieldType = fieldType;
+            this.literals = literals;
+        }
+
+        public static ReferencedField checkArgument(
+                Map<String, DataType> typeMapping, boolean caseSensitive, 
String... args) {
+            String referencedField = args[0].trim();
+            String[] literals =
+                    
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
+            String referencedFieldCheckForm =
+                    StringUtils.caseSensitiveConversion(referencedField, 
caseSensitive);
+
+            DataType fieldType =
+                    checkNotNull(
+                            typeMapping.get(referencedFieldCheckForm),
+                            String.format(
+                                    "Referenced field '%s' is not in given 
fields: %s.",
+                                    referencedFieldCheckForm, 
typeMapping.keySet()));
+            return new ReferencedField(referencedField, fieldType, literals);
+        }
+
+        public String field() {
+            return field;
+        }
+
+        public DataType fieldType() {
+            return fieldType;
+        }
+
+        public String[] literals() {
+            return literals;
+        }
+    }
+
     static Expression create(
-            String exprName, String fieldReference, DataType fieldType, 
String... literals) {
-        switch (exprName.toLowerCase()) {
-            case "year":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getYear, literals);
-            case "month":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getMonthValue, literals);
-            case "day":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getDayOfMonth, literals);
-            case "hour":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getHour, literals);
-            case "minute":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getMinute, literals);
-            case "second":
-                return TemporalToIntConverter.create(
-                        fieldReference, fieldType, () -> 
LocalDateTime::getSecond, literals);
-            case "date_format":
-                return DateFormat.create(fieldReference, fieldType, literals);
-            case "substring":
-                return substring(fieldReference, literals);
-            case "truncate":
-                return truncate(fieldReference, fieldType, literals);
-                // TODO: support more expression
-            default:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Unsupported expression: %s. Supported 
expressions are: %s",
-                                exprName, String.join(",", 
SUPPORTED_EXPRESSION)));
+            Map<String, DataType> typeMapping,
+            boolean caseSensitive,
+            String exprName,
+            String... args) {
+
+        ExpressionCreator function = 
ExpressionFunction.creator(exprName.toLowerCase());
+        if (function == null) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported expression: %s. Supported expressions 
are: %s",
+                            exprName,
+                            String.join(",", 
ExpressionFunction.EXPRESSION_FUNCTIONS.keySet())));
         }
+        return function.create(typeMapping, caseSensitive, args);
     }
 
     static Expression substring(String fieldReference, String... literals) {
@@ -137,6 +269,21 @@ public interface Expression extends Serializable {
         return new TruncateComputer(fieldReference, fieldType, literals[0]);
     }
 
+    static Expression cast(String... literals) {
+        checkArgument(
+                literals.length == 1 || literals.length == 2,
+                String.format(
+                        "'cast' expression supports one or two arguments, but 
found '%s'.",
+                        literals.length));
+        DataType dataType = DataTypes.STRING();
+        if (literals.length == 2) {
+            dataType = 
DataTypeJsonParser.parseAtomicTypeSQLString(literals[1]);
+        }
+        return new CastExpression(literals[0], dataType);
+    }
+
+    // ======================== Expression Implementations 
========================
+
     /** Expression to handle temporal value. */
     abstract class TemporalExpressionBase<T> implements Expression {
 
@@ -431,4 +578,34 @@ public interface Expression extends Serializable {
             return value.subtract(remainder);
         }
     }
+
+    /** Get constant value. */
+    final class CastExpression implements Expression {
+
+        private static final long serialVersionUID = 1L;
+
+        private final String value;
+
+        private final DataType dataType;
+
+        private CastExpression(String value, DataType dataType) {
+            this.value = value;
+            this.dataType = dataType;
+        }
+
+        @Override
+        public String fieldReference() {
+            return null;
+        }
+
+        @Override
+        public DataType outputType() {
+            return dataType;
+        }
+
+        @Override
+        public String eval(String input) {
+            return value;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index fb54763cb..67358f848 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -734,7 +734,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
                         "_substring_date1=substring(_date,2)",
                         "_substring_date2=substring(_timestamp,5,10)",
-                        "_truncate_date=trUNcate(pk,2)"); // test 
case-insensitive too
+                        "_truncate_date=trUNcate(pk,2)", // test 
case-insensitive too
+                        "_constant=cast(11,INT)");
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
@@ -785,7 +786,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.STRING(),
                             DataTypes.STRING(),
                             DataTypes.STRING(),
-                            DataTypes.INT().notNull()
+                            DataTypes.INT().notNull(),
+                            DataTypes.INT()
                         },
                         new String[] {
                             "pk",
@@ -815,12 +817,13 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_date_format_timestamp",
                             "_substring_date1",
                             "_substring_date2",
-                            "_truncate_date"
+                            "_truncate_date",
+                            "_constant"
                         });
         List<String> expected =
                 Arrays.asList(
-                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 
2022-01-01, 20210915, 23-03-23, 09-15, 0]",
-                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, 
NULL, 23-03-23, NULL, 2]");
+                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 
2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]",
+                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, 
NULL, 23-03-23, NULL, 2, 11]");
         waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 

Reply via email to