This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f93875feb [Feature][Transform-V2] Support case when Expression (#6123)
4f93875feb is described below

commit 4f93875feb1a165f149bdb6772f580f572b8d3da
Author: ZhilinLi <[email protected]>
AuthorDate: Thu Jan 18 10:24:38 2024 +0800

    [Feature][Transform-V2] Support case when Expression (#6123)
---
 docs/en/transform-v2/sql-functions.md              |  49 ++++++++
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |   2 +
 .../test/resources/sql_transform/case_when.conf    | 136 +++++++++++++++++++++
 seatunnel-transforms-v2/pom.xml                    |   5 +
 .../transform/sql/zeta/ZetaSQLEngine.java          |   2 +-
 .../transform/sql/zeta/ZetaSQLFilter.java          |  11 +-
 .../transform/sql/zeta/ZetaSQLFunction.java        |  32 +++++
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  96 +++++++++++++++
 .../sql/zeta/functions/SystemFunction.java         |  15 +++
 9 files changed, 345 insertions(+), 3 deletions(-)

diff --git a/docs/en/transform-v2/sql-functions.md 
b/docs/en/transform-v2/sql-functions.md
index c074fab74f..ab98c2d2bf 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -915,3 +915,52 @@ Returns NULL if 'a' is equal to 'b', otherwise 'a'.
 Example:
 
 NULLIF(A, B)
+
+### CASE WHEN
+
+```
+select
+  case
+    when c_string in ('c_string') then 1
+    else 0
+  end as c_string_1,
+  case
+    when c_string not in ('c_string') then 1
+    else 0
+  end as c_string_0,
+  case
+    when c_tinyint = 117
+    and TO_CHAR(c_boolean) = 'true' then 1
+    else 0
+  end as c_tinyint_boolean_1,
+  case
+    when c_tinyint != 117
+    and TO_CHAR(c_boolean) = 'true' then 1
+    else 0
+  end as c_tinyint_boolean_0,
+  case
+    when c_tinyint != 117
+    or TO_CHAR(c_boolean) = 'true' then 1
+    else 0
+  end as c_tinyint_boolean_or_1,
+  case
+    when c_int > 1
+    and c_bigint > 1
+    and c_float > 1
+    and c_double > 1
+    and c_decimal > 1 then 1
+    else 0
+  end as c_number_1,
+  case
+    when c_tinyint <> 117 then 1
+    else 0
+  end as c_number_0
+from
+  fake
+```
+
+It is used to determine whether the condition is valid and return different 
values according to different judgments
+
+Example:
+
+case when c_string in ('c_string') then 1 else 0 end
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index ad3f8edfa8..54e2f0ae13 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -55,5 +55,7 @@ public class TestSQLIT extends TestSuiteBase {
         Container.ExecResult sqlAllColumns =
                 container.executeJob("/sql_transform/sql_all_columns.conf");
         Assertions.assertEquals(0, sqlAllColumns.getExitCode());
+        Container.ExecResult caseWhenSql = 
container.executeJob("/sql_transform/case_when.conf");
+        Assertions.assertEquals(0, caseWhenSql.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/case_when.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/case_when.conf
new file mode 100644
index 0000000000..f7828539ee
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/case_when.conf
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["c_string", true, 117, 15987, 56387395, 7084913402530365000, 
1.23, 1.23, "2924137191386439303744.39292216", "bWlJWmo=", "2023-04-22", 
"2023-04-22T23:20:58"]
+      }
+    ]
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = """
+      select case when c_string in ('c_string') then 1 else 0 end     as 
c_string_1,
+       case when c_string not in ('c_string') then 1 else 0 end as c_string_0,
+       case when c_tinyint = 117 and TO_CHAR(c_boolean)='true' then 1 else 0 
end as c_tinyint_boolean_1,
+       case when c_tinyint != 117 and TO_CHAR(c_boolean)='true' then 1 else 0 
end as c_tinyint_boolean_0,
+       case when c_tinyint != 117 or TO_CHAR(c_boolean)='true' then 1 else 0 
end as c_tinyint_boolean_or_1,
+       case when c_int > 1 and c_bigint >1 and c_float >1 and c_double > 1 and 
c_decimal > 1 then 1 else 0 end as c_number_1,
+       case when c_tinyint <> 117 then 1 else 0 end as c_number_0
+       from fake
+    """
+  }
+}
+
+
+sink {
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          }
+        ],
+        field_rules = [
+          {
+            field_name = "c_string_1"
+            field_type = "int"
+            field_value = [
+              {equals_to = 1}
+            ]
+          }, {
+            field_name = "c_string_0"
+            field_type = "int"
+            field_value = [
+              {equals_to = 0}
+            ]
+          }, {
+            field_name = "c_tinyint_boolean_1"
+            field_type = "int"
+            field_value = [
+              {equals_to = 1}
+            ]
+          }, {
+            field_name = "c_tinyint_boolean_0"
+            field_type = "int"
+            field_value = [
+              {equals_to = 0}
+            ]
+          }, {
+            field_name = "c_tinyint_boolean_or_1"
+            field_type = "int"
+            field_value = [
+              {equals_to = 1}
+            ]
+          }, {
+            field_name = "c_number_1"
+            field_type = "int"
+            field_value = [
+              {equals_to = 1}
+            ]
+          }, {
+            field_name = "c_number_0"
+            field_type = "int"
+            field_value = [
+              {equals_to = 0}
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 833b14d372..4c0a63c7dc 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -60,6 +60,11 @@
             <artifactId>seatunnel-format-json</artifactId>
             <version>${revision}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index b34dcb2c2a..1fa01c0dfa 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -78,7 +78,7 @@ public class ZetaSQLEngine implements SQLEngine {
 
         this.zetaSQLType = new ZetaSQLType(inputRowType, udfList);
         this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType, 
udfList);
-        this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction);
+        this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction, zetaSQLType);
 
         parseSQL();
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
index 909c3d893f..b3542663eb 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.transform.sql.zeta;
 
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 
@@ -47,9 +48,15 @@ import java.util.regex.Pattern;
 
 public class ZetaSQLFilter {
     private final ZetaSQLFunction zetaSQLFunction;
+    private final ZetaSQLType zetaSQLType;
 
-    public ZetaSQLFilter(ZetaSQLFunction zetaSQLFunction) {
+    public ZetaSQLFilter(ZetaSQLFunction zetaSQLFunction, ZetaSQLType 
zetaSQLType) {
         this.zetaSQLFunction = zetaSQLFunction;
+        this.zetaSQLType = zetaSQLType;
+    }
+
+    public boolean isConditionExpr(Expression expression) {
+        return 
BasicType.BOOLEAN_TYPE.equals(zetaSQLType.getExpressionType(expression));
     }
 
     public boolean executeFilter(Expression whereExpr, Object[] inputFields) {
@@ -252,7 +259,7 @@ public class ZetaSQLFilter {
         return Pair.of(leftVal, rightVal);
     }
 
-    private boolean equalsToExpr(Pair<Object, Object> pair) {
+    boolean equalsToExpr(Pair<Object, Object> pair) {
         Object leftVal = pair.getLeft();
         Object rightVal = pair.getRight();
         if (leftVal == null || rightVal == null) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 1ed6469b86..23cf4844ed 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -28,7 +28,10 @@ import 
org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.CaseExpression;
 import net.sf.jsqlparser.expression.CastExpression;
 import net.sf.jsqlparser.expression.DoubleValue;
 import net.sf.jsqlparser.expression.Expression;
@@ -39,6 +42,7 @@ import net.sf.jsqlparser.expression.NullValue;
 import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
 import net.sf.jsqlparser.expression.operators.arithmetic.Division;
@@ -164,6 +168,7 @@ public class ZetaSQLFunction {
 
     private final SeaTunnelRowType inputRowType;
     private final ZetaSQLType zetaSQLType;
+    private final ZetaSQLFilter zetaSQLFilter;
 
     private final List<ZetaUDF> udfList;
 
@@ -171,6 +176,7 @@ public class ZetaSQLFunction {
             SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType, 
List<ZetaUDF> udfList) {
         this.inputRowType = inputRowType;
         this.zetaSQLType = zetaSQLType;
+        this.zetaSQLFilter = new ZetaSQLFilter(this, zetaSQLType);
         this.udfList = udfList;
     }
 
@@ -221,6 +227,12 @@ public class ZetaSQLFunction {
             Parenthesis parenthesis = (Parenthesis) expression;
             return computeForValue(parenthesis.getExpression(), inputFields);
         }
+        if (expression instanceof CaseExpression) {
+            CaseExpression caseExpression = (CaseExpression) expression;
+            final Object value = executeCaseExpr(caseExpression, inputFields);
+            SeaTunnelDataType<?> type = 
zetaSQLType.getExpressionType(expression);
+            return SystemFunction.castAs(value, type);
+        }
         if (expression instanceof BinaryExpression) {
             return executeBinaryExpr((BinaryExpression) expression, 
inputFields);
         }
@@ -235,6 +247,26 @@ public class ZetaSQLFunction {
                 String.format("Unsupported SQL Expression: %s ", 
expression.toString()));
     }
 
+    public Object executeCaseExpr(CaseExpression caseExpression, Object[] 
inputFields) {
+        Expression switchExpr = caseExpression.getSwitchExpression();
+        Object switchValue = switchExpr == null ? null : 
computeForValue(switchExpr, inputFields);
+        for (WhenClause whenClause : caseExpression.getWhenClauses()) {
+            Expression whenExpression = whenClause.getWhenExpression();
+            final Object when =
+                    zetaSQLFilter.isConditionExpr(whenExpression)
+                            ? zetaSQLFilter.executeFilter(whenExpression, 
inputFields)
+                            : computeForValue(whenExpression, inputFields);
+            // match: case [column] when column1 compare other, add by 
javalover123
+            if (when instanceof Boolean && (boolean) when) {
+                return computeForValue(whenClause.getThenExpression(), 
inputFields);
+            } else if (zetaSQLFilter.equalsToExpr(Pair.of(switchValue, when))) 
{
+                return computeForValue(whenClause.getThenExpression(), 
inputFields);
+            }
+        }
+        final Expression elseExpression = caseExpression.getElseExpression();
+        return elseExpression == null ? null : computeForValue(elseExpression, 
inputFields);
+    }
+
     public Object executeFunctionExpr(String functionName, List<Object> args) {
         switch (functionName.toUpperCase()) {
             case ASCII:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 4db9b95af2..968566e45a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -26,7 +26,10 @@ import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import net.sf.jsqlparser.expression.BinaryExpression;
+import net.sf.jsqlparser.expression.CaseExpression;
 import net.sf.jsqlparser.expression.CastExpression;
 import net.sf.jsqlparser.expression.DoubleValue;
 import net.sf.jsqlparser.expression.Expression;
@@ -37,12 +40,23 @@ import net.sf.jsqlparser.expression.NullValue;
 import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
 import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.expression.operators.relational.InExpression;
+import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
 import net.sf.jsqlparser.schema.Column;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class ZetaSQLType {
     public static final String DECIMAL = "DECIMAL";
@@ -106,6 +120,20 @@ public class ZetaSQLType {
         if (expression instanceof Concat) {
             return BasicType.STRING_TYPE;
         }
+
+        if (expression instanceof CaseExpression) {
+            return getCaseType((CaseExpression) expression);
+        }
+        if (expression instanceof ComparisonOperator
+                || expression instanceof IsNullExpression
+                || expression instanceof InExpression
+                || expression instanceof LikeExpression
+                || expression instanceof AndExpression
+                || expression instanceof OrExpression
+                || expression instanceof NotEqualsTo) {
+            return BasicType.BOOLEAN_TYPE;
+        }
+
         if (expression instanceof CastExpression) {
             return getCastType((CastExpression) expression);
         }
@@ -149,6 +177,74 @@ public class ZetaSQLType {
                 String.format("Unsupported SQL Expression: %s ", 
expression.toString()));
     }
 
+    public boolean isNumberType(SqlType type) {
+        return type.compareTo(SqlType.TINYINT) >= 0 && 
type.compareTo(SqlType.DECIMAL) <= 0;
+    }
+
+    public SeaTunnelDataType<?> getMaxType(
+            SeaTunnelDataType<?> leftType, SeaTunnelDataType<?> rightType) {
+        if (leftType == null || BasicType.VOID_TYPE.equals(leftType)) {
+            return rightType;
+        }
+        if (rightType == null || BasicType.VOID_TYPE.equals(rightType)) {
+            return leftType;
+        }
+        if (leftType.equals(rightType)) {
+            return leftType;
+        }
+
+        final boolean isAllNumber =
+                isNumberType(leftType.getSqlType()) && 
isNumberType(rightType.getSqlType());
+        if (!isAllNumber) {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    leftType + " type not compatible " + rightType);
+        }
+
+        if (leftType.getSqlType() == SqlType.DECIMAL || rightType.getSqlType() 
== SqlType.DECIMAL) {
+            int precision = 0;
+            int scale = 0;
+            if (leftType.getSqlType() == SqlType.DECIMAL) {
+                DecimalType decimalType = (DecimalType) leftType;
+                precision = decimalType.getPrecision();
+                scale = decimalType.getScale();
+            }
+            if (rightType.getSqlType() == SqlType.DECIMAL) {
+                DecimalType decimalType = (DecimalType) rightType;
+                precision = Math.max(decimalType.getPrecision(), precision);
+                scale = Math.max(decimalType.getScale(), scale);
+            }
+            return new DecimalType(precision, scale);
+        }
+        return leftType.getSqlType().compareTo(rightType.getSqlType()) <= 0 ? 
rightType : leftType;
+    }
+
+    public SeaTunnelDataType<?> getMaxType(Collection<SeaTunnelDataType<?>> 
types) {
+        if (CollectionUtils.isEmpty(types)) {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    "getMaxType parameter is null");
+        }
+        Iterator<SeaTunnelDataType<?>> iterator = types.iterator();
+        SeaTunnelDataType<?> result = iterator.next();
+        while (iterator.hasNext()) {
+            result = getMaxType(result, iterator.next());
+        }
+        return result;
+    }
+
+    private SeaTunnelDataType<?> getCaseType(CaseExpression caseExpression) {
+        final Collection<SeaTunnelDataType<?>> types =
+                caseExpression.getWhenClauses().stream()
+                        .map(WhenClause::getThenExpression)
+                        .map(this::getExpressionType)
+                        .collect(Collectors.toSet());
+        if (caseExpression.getElseExpression() != null) {
+            types.add(getExpressionType(caseExpression.getElseExpression()));
+        }
+        return getMaxType(types);
+    }
+
     private SeaTunnelDataType<?> getCastType(CastExpression castExpression) {
         String dataType = castExpression.getType().getDataType();
         switch (dataType.toUpperCase()) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
index b9eeb48b40..0039f0cade 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.transform.sql.zeta.functions;
 
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 
@@ -27,6 +29,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.List;
 
 public class SystemFunction {
@@ -62,6 +65,18 @@ public class SystemFunction {
         return v1;
     }
 
+    public static Object castAs(Object arg, SeaTunnelDataType<?> type) {
+        final ArrayList<Object> args = new ArrayList<>(4);
+        args.add(arg);
+        args.add(type.getSqlType().toString());
+        if (DecimalType.class.equals(type.getClass())) {
+            final DecimalType decimalType = (DecimalType) type;
+            args.add(decimalType.getPrecision());
+            args.add(decimalType.getScale());
+        }
+        return castAs(args);
+    }
+
     public static Object castAs(List<Object> args) {
         Object v1 = args.get(0);
         String v2 = (String) args.get(1);

Reply via email to