This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68dc82e9f438dbce6e5cf7a91c2fd14fddae7eaf
Author: Timo Walther <[email protected]>
AuthorDate: Mon Apr 12 14:34:47 2021 +0200

    [hotfix][table-api-java] Improve built-in functions as parameters of 
user-defined functions
---
 .../table/expressions/ApiExpressionUtils.java      |  6 ++++--
 .../expressions/UnresolvedCallExpression.java      |  6 ++----
 .../rules/QualifyBuiltInFunctionsRule.java         | 25 +++++++++++++---------
 .../table/types/inference/TypeInferenceUtil.java   | 15 +++++++------
 .../inference/TypeInferenceOperandChecker.java     |  3 +--
 .../planner/functions/MiscFunctionsITCase.java     |  7 ++++--
 6 files changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 2d87882..2773ec0 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -260,14 +262,14 @@ public final class ApiExpressionUtils {
     }
 
     public static UnresolvedCallExpression unresolvedCall(
-            FunctionIdentifier functionIdentifier,
+            @Nullable FunctionIdentifier functionIdentifier,
             FunctionDefinition functionDefinition,
             Expression... args) {
         return unresolvedCall(functionIdentifier, functionDefinition, 
Arrays.asList(args));
     }
 
     public static UnresolvedCallExpression unresolvedCall(
-            FunctionIdentifier functionIdentifier,
+            @Nullable FunctionIdentifier functionIdentifier,
             FunctionDefinition functionDefinition,
             List<Expression> args) {
         return new UnresolvedCallExpression(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
index 7425f53..f8ef67b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
@@ -55,12 +55,10 @@ public final class UnresolvedCallExpression implements 
Expression {
     private final List<Expression> args;
 
     UnresolvedCallExpression(
-            FunctionIdentifier functionIdentifier,
+            @Nullable FunctionIdentifier functionIdentifier,
             FunctionDefinition functionDefinition,
             List<Expression> args) {
-        this.functionIdentifier =
-                Preconditions.checkNotNull(
-                        functionIdentifier, "Function identifier must not be 
null.");
+        this.functionIdentifier = functionIdentifier;
         this.functionDefinition =
                 Preconditions.checkNotNull(
                         functionDefinition, "Function definition must not be 
null.");
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
index 331fe75..16a5d9e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -51,25 +53,28 @@ final class QualifyBuiltInFunctionsRule implements 
ResolverRule {
 
         @Override
         public Expression visit(UnresolvedCallExpression unresolvedCall) {
+            final FunctionDefinition definition = 
unresolvedCall.getFunctionDefinition();
+
+            final FunctionIdentifier identifier;
             if (!unresolvedCall.getFunctionIdentifier().isPresent()
-                    && unresolvedCall.getFunctionDefinition()
-                            instanceof BuiltInFunctionDefinition) {
+                    && definition instanceof BuiltInFunctionDefinition) {
                 final FunctionLookup.Result functionLookup =
                         resolutionContext
                                 .functionLookup()
                                 .lookupBuiltInFunction(
                                         ((BuiltInFunctionDefinition)
                                                 
unresolvedCall.getFunctionDefinition()));
-
-                return ApiExpressionUtils.unresolvedCall(
-                        functionLookup.getFunctionIdentifier(),
-                        functionLookup.getFunctionDefinition(),
-                        unresolvedCall.getChildren().stream()
-                                .map(c -> c.accept(this))
-                                .collect(Collectors.toList()));
+                identifier = functionLookup.getFunctionIdentifier();
+            } else {
+                identifier = 
unresolvedCall.getFunctionIdentifier().orElse(null);
             }
 
-            return unresolvedCall;
+            return ApiExpressionUtils.unresolvedCall(
+                    identifier,
+                    definition,
+                    unresolvedCall.getChildren().stream()
+                            .map(c -> c.accept(this))
+                            .collect(Collectors.toList()));
         }
 
         @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
index 32280ce..1a045ae 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
@@ -96,12 +96,12 @@ public final class TypeInferenceUtil {
      * <p>This includes casts that need to be inserted, reordering of 
arguments (*), or insertion of
      * default values (*) where (*) is future work.
      */
-    public static AdaptedCallContext adaptArguments(
+    public static CallContext adaptArguments(
             TypeInference typeInference, CallContext callContext, @Nullable 
DataType outputType) {
         return adaptArguments(typeInference, callContext, outputType, true);
     }
 
-    private static AdaptedCallContext adaptArguments(
+    private static CallContext adaptArguments(
             TypeInference typeInference,
             CallContext callContext,
             @Nullable DataType outputType,
@@ -129,6 +129,10 @@ public final class TypeInferenceUtil {
             final DataType expectedType = expectedTypes.get(pos);
             final DataType actualType = actualTypes.get(pos);
             if (!supportsImplicitCast(actualType.getLogicalType(), 
expectedType.getLogicalType())) {
+                if (!throwOnInferInputFailure) {
+                    // abort the adaption, e.g. if a NULL is passed for a NOT 
NULL argument
+                    return callContext;
+                }
                 throw new ValidationException(
                         String.format(
                                 "Invalid argument type at position %d. Data 
type %s expected but %s passed.",
@@ -261,9 +265,8 @@ public final class TypeInferenceUtil {
                             isGroupedAggregation);
 
             // We might not be able to infer the input types at this moment, 
if the surrounding
-            // function
-            // does not provide an explicit input type strategy.
-            final AdaptedCallContext adaptedContext =
+            // function does not provide an explicit input type strategy.
+            final CallContext adaptedContext =
                     adaptArguments(typeInference, callContext, null, false);
             return typeInference
                     .getInputTypeStrategy()
@@ -324,7 +327,7 @@ public final class TypeInferenceUtil {
             throw createInvalidInputException(typeInference, callContext, e);
         }
 
-        final AdaptedCallContext adaptedCallContext;
+        final CallContext adaptedCallContext;
         try {
             // use information of surrounding call to determine output type of 
this call
             final DataType outputType;
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
index 422f357..a98efee 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
-import org.apache.flink.table.types.inference.utils.AdaptedCallContext;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -118,7 +117,7 @@ public final class TypeInferenceOperandChecker implements 
SqlOperandTypeChecker
     // 
--------------------------------------------------------------------------------------------
 
     private boolean checkOperandTypesOrError(SqlCallBinding callBinding, 
CallContext callContext) {
-        final AdaptedCallContext adaptedCallContext;
+        final CallContext adaptedCallContext;
         try {
             adaptedCallContext = adaptArguments(typeInference, callContext, 
null);
         } catch (ValidationException e) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
index 6d242fc..9915b80 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
@@ -79,8 +79,11 @@ public class MiscFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 "IFNULL(f1, f0)",
                                 new BigDecimal("123.45"),
                                 DataTypes.DECIMAL(12, 2).notNull())
-                        .testSqlResult(
-                                "TakesNotNull(IFNULL(f0, 12))", 12, 
DataTypes.INT().notNull()),
+                        .testResult(
+                                call("TakesNotNull", $("f0").ifNull(12)),
+                                "TakesNotNull(IFNULL(f0, 12))",
+                                12,
+                                DataTypes.INT().notNull()),
                 TestSpec.forExpression("SQL call")
                         .onFieldsWithData(null, 12, "Hello World")
                         .andDataTypes(

Reply via email to