This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68dc82e9f438dbce6e5cf7a91c2fd14fddae7eaf Author: Timo Walther <[email protected]> AuthorDate: Mon Apr 12 14:34:47 2021 +0200 [hotfix][table-api-java] Improve built-in functions as parameters of user-defined functions --- .../table/expressions/ApiExpressionUtils.java | 6 ++++-- .../expressions/UnresolvedCallExpression.java | 6 ++---- .../rules/QualifyBuiltInFunctionsRule.java | 25 +++++++++++++--------- .../table/types/inference/TypeInferenceUtil.java | 15 +++++++------ .../inference/TypeInferenceOperandChecker.java | 3 +-- .../planner/functions/MiscFunctionsITCase.java | 7 ++++-- 6 files changed, 36 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java index 2d87882..2773ec0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java @@ -33,6 +33,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import javax.annotation.Nullable; + import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -260,14 +262,14 @@ public final class ApiExpressionUtils { } public static UnresolvedCallExpression unresolvedCall( - FunctionIdentifier functionIdentifier, + @Nullable FunctionIdentifier functionIdentifier, FunctionDefinition functionDefinition, Expression... args) { return unresolvedCall(functionIdentifier, functionDefinition, Arrays.asList(args)); } public static UnresolvedCallExpression unresolvedCall( - FunctionIdentifier functionIdentifier, + @Nullable FunctionIdentifier functionIdentifier, FunctionDefinition functionDefinition, List<Expression> args) { return new UnresolvedCallExpression( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java index 7425f53..f8ef67b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java @@ -55,12 +55,10 @@ public final class UnresolvedCallExpression implements Expression { private final List<Expression> args; UnresolvedCallExpression( - FunctionIdentifier functionIdentifier, + @Nullable FunctionIdentifier functionIdentifier, FunctionDefinition functionDefinition, List<Expression> args) { - this.functionIdentifier = - Preconditions.checkNotNull( - functionIdentifier, "Function identifier must not be null."); + this.functionIdentifier = functionIdentifier; this.functionDefinition = Preconditions.checkNotNull( functionDefinition, "Function definition must not be null."); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java index 331fe75..16a5d9e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java @@ -25,6 +25,8 @@ import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinition; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionIdentifier; import java.util.List; import java.util.stream.Collectors; @@ -51,25 +53,28 @@ final class QualifyBuiltInFunctionsRule implements ResolverRule { @Override public Expression visit(UnresolvedCallExpression unresolvedCall) { + final FunctionDefinition definition = unresolvedCall.getFunctionDefinition(); + + final FunctionIdentifier identifier; if (!unresolvedCall.getFunctionIdentifier().isPresent() - && unresolvedCall.getFunctionDefinition() - instanceof BuiltInFunctionDefinition) { + && definition instanceof BuiltInFunctionDefinition) { final FunctionLookup.Result functionLookup = resolutionContext .functionLookup() .lookupBuiltInFunction( ((BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition())); - - return ApiExpressionUtils.unresolvedCall( - functionLookup.getFunctionIdentifier(), - functionLookup.getFunctionDefinition(), - unresolvedCall.getChildren().stream() - .map(c -> c.accept(this)) - .collect(Collectors.toList())); + identifier = functionLookup.getFunctionIdentifier(); + } else { + identifier = unresolvedCall.getFunctionIdentifier().orElse(null); } - return unresolvedCall; + return ApiExpressionUtils.unresolvedCall( + identifier, + definition, + unresolvedCall.getChildren().stream() + .map(c -> c.accept(this)) + .collect(Collectors.toList())); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java index 32280ce..1a045ae 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java @@ -96,12 +96,12 @@ public final class TypeInferenceUtil { * <p>This includes casts that need to be inserted, reordering of arguments (*), or insertion of * default values (*) where (*) is future work. */ - public static AdaptedCallContext adaptArguments( + public static CallContext adaptArguments( TypeInference typeInference, CallContext callContext, @Nullable DataType outputType) { return adaptArguments(typeInference, callContext, outputType, true); } - private static AdaptedCallContext adaptArguments( + private static CallContext adaptArguments( TypeInference typeInference, CallContext callContext, @Nullable DataType outputType, @@ -129,6 +129,10 @@ public final class TypeInferenceUtil { final DataType expectedType = expectedTypes.get(pos); final DataType actualType = actualTypes.get(pos); if (!supportsImplicitCast(actualType.getLogicalType(), expectedType.getLogicalType())) { + if (!throwOnInferInputFailure) { + // abort the adaption, e.g. if a NULL is passed for a NOT NULL argument + return callContext; + } throw new ValidationException( String.format( "Invalid argument type at position %d. Data type %s expected but %s passed.", @@ -261,9 +265,8 @@ public final class TypeInferenceUtil { isGroupedAggregation); // We might not be able to infer the input types at this moment, if the surrounding - // function - // does not provide an explicit input type strategy. - final AdaptedCallContext adaptedContext = + // function does not provide an explicit input type strategy. + final CallContext adaptedContext = adaptArguments(typeInference, callContext, null, false); return typeInference .getInputTypeStrategy() @@ -324,7 +327,7 @@ public final class TypeInferenceUtil { throw createInvalidInputException(typeInference, callContext, e); } - final AdaptedCallContext adaptedCallContext; + final CallContext adaptedCallContext; try { // use information of surrounding call to determine output type of this call final DataType outputType; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java index 422f357..a98efee 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java @@ -27,7 +27,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeInferenceUtil; -import org.apache.flink.table.types.inference.utils.AdaptedCallContext; import org.apache.flink.table.types.logical.LogicalType; import org.apache.calcite.rel.type.RelDataType; @@ -118,7 +117,7 @@ public final class TypeInferenceOperandChecker implements SqlOperandTypeChecker // -------------------------------------------------------------------------------------------- private boolean checkOperandTypesOrError(SqlCallBinding callBinding, CallContext callContext) { - final AdaptedCallContext adaptedCallContext; + final CallContext adaptedCallContext; try { adaptedCallContext = adaptArguments(typeInference, callContext, null); } catch (ValidationException e) { diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java index 6d242fc..9915b80 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java @@ -79,8 +79,11 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase { "IFNULL(f1, f0)", new BigDecimal("123.45"), DataTypes.DECIMAL(12, 2).notNull()) - .testSqlResult( - "TakesNotNull(IFNULL(f0, 12))", 12, DataTypes.INT().notNull()), + .testResult( + call("TakesNotNull", $("f0").ifNull(12)), + "TakesNotNull(IFNULL(f0, 12))", + 12, + DataTypes.INT().notNull()), TestSpec.forExpression("SQL call") .onFieldsWithData(null, 12, "Hello World") .andDataTypes(
