This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e8ac5b12b4f [FLINK-39011][table-api-java] Accept UnresolvedDataType in 
BaseExpression.cast()
e8ac5b12b4f is described below

commit e8ac5b12b4f7188b86cb3ecd9fc9ee5d9ccc3410
Author: Timo Walther <[email protected]>
AuthorDate: Wed Feb 4 16:23:54 2026 +0100

    [FLINK-39011][table-api-java] Accept UnresolvedDataType in 
BaseExpression.cast()
    
    This closes #27513.
---
 .../flink/table/api/internal/BaseExpressions.java  | 56 +++++++++++++-----
 .../table/expressions/ApiExpressionUtils.java      |  6 ++
 .../table/expressions/ApiExpressionVisitor.java    |  6 +-
 .../expressions/UnresolvedCallExpression.java      |  4 +-
 .../expressions/UnresolvedReferenceExpression.java |  2 +-
 ...n.java => UnresolvedTypeLiteralExpression.java} | 39 +++++++------
 .../expressions/resolver/ExpressionResolver.java   |  5 +-
 .../resolver/rules/ReferenceResolverRule.java      |  8 +--
 .../expressions/resolver/rules/ResolverRules.java  |  6 +-
 .../resolver/rules/TypeResolverRule.java           | 67 ++++++++++++++++++++++
 .../utils/ApiExpressionDefaultVisitor.java         |  6 ++
 .../resolver/ExpressionResolverTest.java           | 15 ++++-
 12 files changed, 176 insertions(+), 44 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 42a86cee711..d2afdc24892 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -28,12 +28,12 @@ import org.apache.flink.table.api.JsonQueryWrapper;
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
 
 import java.util.Arrays;
 import java.util.stream.Stream;
@@ -48,6 +48,7 @@ import static 
org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInter
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInterval;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedType;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
@@ -618,34 +619,61 @@ public abstract class BaseExpressions<InType, OutType> {
     }
 
     /**
-     * Returns a new value being cast to {@code toType}. A cast error throws 
an exception and fails
-     * the job. When performing a cast operation that may fail, like {@link 
DataTypes#STRING()} to
-     * {@link DataTypes#INT()}, one should rather use {@link 
#tryCast(DataType)}, in order to handle
-     * errors. If {@link 
ExecutionConfigOptions#TABLE_EXEC_LEGACY_CAST_BEHAVIOUR} is enabled, this
-     * function behaves like {@link #tryCast(DataType)}.
+     * Returns a new value being cast to {@code toType}.
      *
-     * <p>E.g. {@code "42".cast(DataTypes.INT())} returns {@code 42}; {@code
-     * null.cast(DataTypes.STRING())} returns {@code null} of type {@link 
DataTypes#STRING()};
-     * {@code "non-number".cast(DataTypes.INT())} throws an exception and 
fails the job.
+     * <p>A cast error throws an exception and fails the job. When performing 
a cast operation that
+     * may fail, like {@link DataTypes#STRING()} to {@link DataTypes#INT()}, 
one should rather use
+     * {@link #tryCast(DataType)}, to handle errors.
+     *
+     * <p>E.g. {@code lit("42").cast(DataTypes.INT())} returns {@code 42}; 
{@code
+     * lit(null).cast(DataTypes.STRING())} returns {@code null} of type {@link 
DataTypes#STRING()};
+     * {@code lit("non-number").cast(DataTypes.INT())} throws an exception and 
fails the job.
      */
     public OutType cast(DataType toType) {
         return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), 
typeLiteral(toType)));
     }
 
+    /**
+     * Returns a new value being cast to {@code toType}.
+     *
+     * <p>This method takes {@link UnresolvedDataType} that, for example, 
originates from {@link
+     * DataTypes#of(Class)} or {@link DataTypes#of(String)}. The data type 
will be resolved to a
+     * fully qualified {@link DataType}.
+     *
+     * @see #cast(DataType)
+     */
+    public OutType cast(UnresolvedDataType toType) {
+        return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), 
unresolvedType(toType)));
+    }
+
     /**
      * Like {@link #cast(DataType)}, but in case of error, returns {@code 
null} rather than failing
      * the job.
      *
-     * <p>E.g. {@code "42".tryCast(DataTypes.INT())} returns {@code 42}; {@code
-     * null.tryCast(DataTypes.STRING())} returns {@code null} of type {@link 
DataTypes#STRING()};
-     * {@code "non-number".tryCast(DataTypes.INT())} returns {@code null} of 
type {@link
-     * DataTypes#INT()}; {@code 
coalesce("non-number".tryCast(DataTypes.INT()), 0)} returns {@code
-     * 0} of type {@link DataTypes#INT()}.
+     * <p>E.g. {@code lit("42").tryCast(DataTypes.INT())} returns {@code 42}; 
{@code
+     * lit(null).tryCast(DataTypes.STRING())} returns {@code null} of type 
{@link
+     * DataTypes#STRING()}; {@code lit("non-number").tryCast(DataTypes.INT())} 
returns {@code null}
+     * of type {@link DataTypes#INT()}; {@code 
coalesce(lit("non-number").tryCast(DataTypes.INT()),
+     * 0)} returns {@code 0} of type {@link DataTypes#INT()}.
      */
     public OutType tryCast(DataType toType) {
         return toApiSpecificExpression(unresolvedCall(TRY_CAST, toExpr(), 
typeLiteral(toType)));
     }
 
+    /**
+     * Like {@link #cast(DataType)}, but in case of error, returns {@code 
null} rather than failing
+     * the job.
+     *
+     * <p>This method takes {@link UnresolvedDataType} that, for example, 
originates from {@link
+     * DataTypes#of(Class)} or {@link DataTypes#of(String)}. The data type 
will be resolved to a
+     * fully qualified {@link DataType}.
+     *
+     * @see #tryCast(DataType)
+     */
+    public OutType tryCast(UnresolvedDataType toType) {
+        return toApiSpecificExpression(unresolvedCall(TRY_CAST, toExpr(), 
unresolvedType(toType)));
+    }
+
     /** Specifies ascending order of an expression i.e. a field for orderBy 
unresolvedCall. */
     public OutType asc() {
         return toApiSpecificExpression(unresolvedCall(ORDER_ASC, toExpr()));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index bcad8cccfce..095dc754d01 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
@@ -265,6 +266,11 @@ public final class ApiExpressionUtils {
         return new TypeLiteralExpression(dataType);
     }
 
+    public static UnresolvedTypeLiteralExpression unresolvedType(
+            UnresolvedDataType unresolvedDataType) {
+        return new UnresolvedTypeLiteralExpression(unresolvedDataType);
+    }
+
     public static UnresolvedReferenceExpression unresolvedRef(String name) {
         return new UnresolvedReferenceExpression(name);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
index b64b77d24fb..6ecea0cb84e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
@@ -25,7 +25,9 @@ import org.apache.flink.annotation.Internal;
 public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {
 
     public final R visit(Expression other) {
-        if (other instanceof UnresolvedReferenceExpression) {
+        if (other instanceof UnresolvedTypeLiteralExpression) {
+            return visit((UnresolvedTypeLiteralExpression) other);
+        } else if (other instanceof UnresolvedReferenceExpression) {
             return visit((UnresolvedReferenceExpression) other);
         } else if (other instanceof TableReferenceExpression) {
             return visit((TableReferenceExpression) other);
@@ -62,6 +64,8 @@ public abstract class ApiExpressionVisitor<R> implements 
ExpressionVisitor<R> {
     // unresolved API expressions
     // 
--------------------------------------------------------------------------------------------
 
+    public abstract R visit(UnresolvedTypeLiteralExpression 
unresolvedTypeExpression);
+
     public abstract R visit(UnresolvedReferenceExpression unresolvedReference);
 
     public abstract R visit(LookupCallExpression lookupCall);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
index 235c044b986..256725c2144 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
@@ -34,10 +34,10 @@ import java.util.stream.Collectors;
 /**
  * Unresolved call expression for calling a function identified by a {@link 
FunctionDefinition}.
  *
- * <p>This is purely an API facing expression with unvalidated arguments and 
unknown output data
+ * <p>This is purely an API-facing expression with unvalidated arguments and 
unknown output data
  * type.
  *
- * <p>A unresolved call contains:
+ * <p>An unresolved call contains:
  *
  * <ul>
  *   <li>a {@link FunctionDefinition} that identifies the function to be called
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
index 4dfd2363913..5587641b442 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 /**
  * An unresolved reference to a field, table, or local reference.
  *
- * <p>This is a purely API facing expression that will be resolved into {@link
+ * <p>This is a purely API-facing expression that will be resolved into {@link
  * FieldReferenceExpression}, {@link LocalReferenceExpression}, or {@link 
TableReferenceExpression}.
  */
 @PublicEvolving
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedTypeLiteralExpression.java
similarity index 57%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedTypeLiteralExpression.java
index 4dfd2363913..2790f09a471 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedReferenceExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedTypeLiteralExpression.java
@@ -19,40 +19,45 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.UnresolvedDataType;
 
-import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
+
 /**
- * An unresolved reference to a field, table, or local reference.
+ * Unresolved type literal for a given {@link UnresolvedDataType}.
  *
- * <p>This is a purely API facing expression that will be resolved into {@link
- * FieldReferenceExpression}, {@link LocalReferenceExpression}, or {@link 
TableReferenceExpression}.
+ * <p>This is purely an API-facing expression with unvalidated arguments and 
unknown output data
+ * type.
  */
 @PublicEvolving
-public final class UnresolvedReferenceExpression implements Expression, 
Serializable {
+public class UnresolvedTypeLiteralExpression implements Expression {
+
+    private final UnresolvedDataType unresolvedDataType;
 
-    private final String name;
+    UnresolvedTypeLiteralExpression(UnresolvedDataType unresolvedDataType) {
+        this.unresolvedDataType = unresolvedDataType;
+    }
 
-    UnresolvedReferenceExpression(String name) {
-        this.name = Preconditions.checkNotNull(name);
+    public UnresolvedDataType getUnresolvedDataType() {
+        return unresolvedDataType;
     }
 
-    public String getName() {
-        return name;
+    public TypeLiteralExpression resolve(DataTypeFactory dataTypeFactory) {
+        return typeLiteral(unresolvedDataType.toDataType(dataTypeFactory));
     }
 
     @Override
     public String asSummaryString() {
-        return name;
+        return unresolvedDataType.toString();
     }
 
     @Override
     public List<Expression> getChildren() {
-        return Collections.emptyList();
+        return List.of();
     }
 
     @Override
@@ -68,13 +73,13 @@ public final class UnresolvedReferenceExpression implements 
Expression, Serializ
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        UnresolvedReferenceExpression that = (UnresolvedReferenceExpression) o;
-        return Objects.equals(name, that.name);
+        final UnresolvedTypeLiteralExpression that = 
(UnresolvedTypeLiteralExpression) o;
+        return unresolvedDataType.equals(that.unresolvedDataType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name);
+        return Objects.hash(unresolvedDataType);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 210e1dfd85d..0c17ec25e2a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -94,8 +94,9 @@ public class ExpressionResolver {
                 ResolverRules.LOOKUP_CALL_BY_NAME,
                 ResolverRules.FLATTEN_STAR_REFERENCE,
                 ResolverRules.EXPAND_COLUMN_FUNCTIONS,
+                ResolverRules.RESOLVE_TYPE,
                 ResolverRules.OVER_WINDOWS,
-                ResolverRules.FIELD_RESOLVE,
+                ResolverRules.RESOLVE_FIELD,
                 ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
                 ResolverRules.RESOLVE_SQL_CALL,
                 ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
@@ -264,7 +265,7 @@ public class ExpressionResolver {
 
     private Expression resolveFieldsInSingleExpression(Expression expression) {
         List<Expression> expressions =
-                ResolverRules.FIELD_RESOLVE.apply(
+                ResolverRules.RESOLVE_FIELD.apply(
                         Collections.singletonList(expression), new 
ExpressionResolverContext());
 
         if (expressions.size() != 1) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
index b7606f49e03..0b814d07ff4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 
@@ -30,13 +31,10 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
 
 /**
- * Resolves {@link UnresolvedReferenceExpression} to either {@link
- * org.apache.flink.table.expressions.FieldReferenceExpression}, {@link
- * org.apache.flink.table.expressions.TableReferenceExpression}, or {@link 
LocalReferenceExpression}
- * in this order.
+ * Resolves {@link UnresolvedReferenceExpression} to either {@link 
FieldReferenceExpression}, {@link
+ * TableReferenceExpression}, or {@link LocalReferenceExpression} in this 
order.
  */
 @Internal
 final class ReferenceResolverRule implements ResolverRule {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 97f05cf83d4..c006d3634e5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -22,16 +22,20 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.UnresolvedDataType;
 
 /** Contains instances of {@link ResolverRule}. */
 @Internal
 public final class ResolverRules {
 
+    /** Resolves {@link UnresolvedDataType}. See {@link TypeResolverRule} for 
details. */
+    public static final ResolverRule RESOLVE_TYPE = new TypeResolverRule();
+
     /**
      * Resolves {@link UnresolvedReferenceExpression}. See {@link 
ReferenceResolverRule} for
      * details.
      */
-    public static final ResolverRule FIELD_RESOLVE = new 
ReferenceResolverRule();
+    public static final ResolverRule RESOLVE_FIELD = new 
ReferenceResolverRule();
 
     /** Resolves {@link SqlCallExpression}s. */
     public static final ResolverRule RESOLVE_SQL_CALL = new 
ResolveSqlCallRule();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/TypeResolverRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/TypeResolverRule.java
new file mode 100644
index 00000000000..cd2a3580461
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/TypeResolverRule.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.resolver.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedTypeLiteralExpression;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Resolves {@link UnresolvedTypeLiteralExpression} to {@link 
TypeLiteralExpression}. */
+@Internal
+final class TypeResolverRule implements ResolverRule {
+
+    @Override
+    public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
+        return expression.stream()
+                .map(expr -> expr.accept(new 
ExpressionResolverVisitor(context)))
+                .collect(Collectors.toList());
+    }
+
+    private static class ExpressionResolverVisitor extends 
RuleExpressionVisitor<Expression> {
+
+        ExpressionResolverVisitor(ResolutionContext resolutionContext) {
+            super(resolutionContext);
+        }
+
+        @Override
+        public Expression visit(UnresolvedCallExpression unresolvedCall) {
+            final List<Expression> resolvedArgs =
+                    unresolvedCall.getChildren().stream()
+                            .map(expr -> expr.accept(this))
+                            .collect(Collectors.toList());
+
+            return unresolvedCall.replaceArgs(resolvedArgs);
+        }
+
+        @Override
+        public Expression visit(UnresolvedTypeLiteralExpression 
unresolvedTypeExpression) {
+            return 
unresolvedTypeExpression.resolve(resolutionContext.typeFactory());
+        }
+
+        @Override
+        protected Expression defaultMethod(Expression expression) {
+            return expression;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
index 6e030345409..ded0bfecb9a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.UnresolvedTypeLiteralExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 
 /**
@@ -101,6 +102,11 @@ public abstract class ApiExpressionDefaultVisitor<T> 
extends ApiExpressionVisito
         return defaultMethod(unresolvedReference);
     }
 
+    @Override
+    public T visit(UnresolvedTypeLiteralExpression unresolvedTypeExpression) {
+        return defaultMethod(unresolvedTypeExpression);
+    }
+
     @Override
     public T visit(LookupCallExpression lookupCall) {
         return defaultMethod(lookupCall);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index 21280805a73..92b6324ac60 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -61,6 +62,7 @@ import java.util.stream.Stream;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
 import static org.apache.flink.table.api.Expressions.col;
+import static org.apache.flink.table.api.Expressions.lit;
 import static org.apache.flink.table.api.Expressions.range;
 import static org.apache.flink.table.api.Expressions.withColumns;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
@@ -284,8 +286,19 @@ class ExpressionResolverTest {
                                 .inputSchemas(
                                         TableSchema.builder().field("i", 
DataTypes.INT()).build())
                                 .select(col("i"))
+                                .equalTo(new FieldReferenceExpression("i", 
DataTypes.INT(), 0, 0))),
+                Arguments.of(
+                        TestSpec.test("Test type resolution with 
cast(UnresolvedDataType)")
+                                .inputSchemas(TableSchema.builder().build())
+                                .select(lit(1).cast(DataTypes.of("BIGINT")))
                                 .equalTo(
-                                        new FieldReferenceExpression("i", 
DataTypes.INT(), 0, 0))));
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.CAST,
+                                                List.of(
+                                                        new 
ValueLiteralExpression(1),
+                                                        new 
TypeLiteralExpression(
+                                                                
DataTypes.BIGINT())),
+                                                
DataTypes.BIGINT().notNull()))));
     }
 
     @ParameterizedTest(name = "{0}")

Reply via email to