xuyangzhong commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1467163962
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1073,6 +1073,25 @@ void testNamedArgumentsTableFunction() throws Exception {
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
}
+ @Test
+ void testNamedArgumentsCatalogTableFunctionWithOptionalArguments() throws
Exception {
Review Comment:
nit: remove `Catalog`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java:
##########
@@ -194,14 +194,17 @@ private static <T, H extends Annotation> T defaultAsNull(
"Argument and input hints cannot be declared in the same
function hint.");
}
+ Boolean[] argumentOptionals = null;
Review Comment:
Just a little curious, will there be a null Boolean value as element?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala:
##########
@@ -237,6 +238,9 @@ object StringCallGen {
val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
generateNonNullField(returnType, currentDatabase)
+ case DEFAULT =>
Review Comment:
The class `StringCallGen` contains all the functions that take `StringData`
as input or output parameters. It's not quite appropriate to place the function
`DEFAULT` here.
How about we directly put it into the
`ExprCodeGenerator#generateCallExpression` and add `public static final
SqlSpecialOperator DEFAULT = SqlStdOperatorTable.DEFAULT;` in
`FlinkSqlOperatorTable`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##########
@@ -67,7 +73,24 @@ static FunctionSignatureTemplate of(
throw extractionError(
"Argument name conflict, there are at least two argument
names that are the same.");
}
- return new FunctionSignatureTemplate(argumentTemplates, isVarArgs,
argumentNames);
+ if (argumentOptionals != null && argumentOptionals.length !=
argumentTemplates.size()) {
+ throw extractionError(
+ "Mismatch between number of argument optionals '%s' and
argument types '%s'.",
+ argumentOptionals.length, argumentTemplates.size());
+ }
+ if (argumentOptionals != null) {
+ for (int i = 0; i < argumentTemplates.size(); i++) {
+ DataType dataType = argumentTemplates.get(i).dataType;
+ if (dataType != null
+ && !dataType.getLogicalType().isNullable()
Review Comment:
Nice validation!
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -116,7 +127,17 @@ public Consistency getConsistency() {
@Override
public boolean isOptional(int i) {
- return false;
+ Optional<List<Boolean>> optionalArguments =
typeInference.getOptionalArguments();
+ if (optionalArguments.isPresent()) {
+ return optionalArguments.get().get(i);
Review Comment:
Nit: Will a NullPointerException occur here when converting from Boolean to
boolean?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -116,7 +127,17 @@ public Consistency getConsistency() {
@Override
public boolean isOptional(int i) {
- return false;
+ Optional<List<Boolean>> optionalArguments =
typeInference.getOptionalArguments();
+ if (optionalArguments.isPresent()) {
+ return optionalArguments.get().get(i);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isFixedParameters() {
+ return typeInference.getTypedArguments().isPresent();
Review Comment:
As far as the current situation is concerned, this code is also fine. (I'm
wondering if it's possible to use the `isVarArg` on `FunctionHint`? )
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java:
##########
@@ -70,13 +73,30 @@ public Operation convertSqlNode(SqlNode sqlNode,
ConvertContext context) {
SqlCallBinding sqlCallBinding =
new SqlCallBinding(context.getSqlValidator(), null,
callProcedure);
+
+ TypeInference typeInference =
+ procedureDefinition.getTypeInference(
+ context.getCatalogManager().getDataTypeFactory());
+ List<RexNode> reducedOperands =
reduceOperands(sqlCallBinding.operands(), context);
Review Comment:
If we union the logic about correcting the return type of `DEFAULT`, are
these codes in this class not necessary to be modified?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
rexBuilder,
converter);
}
+
+ def extractDefaultTypes(call: SqlCall, validator: SqlValidator):
Array[RelDataType] = {
+ if (!hasAssignment(call)) {
+ return Array.empty
+ }
+
+ call.getOperator.getOperandTypeChecker
+ .asInstanceOf[SqlOperandMetadata]
+ .paramTypes(validator.getTypeFactory)
+ .asScala
+ .toArray
+ }
+
+ def hasAssignment(call: SqlCall): Boolean = {
Review Comment:
Ditto. BTW, what about noting that this function is copied from
`SqlCallBinding#hasAssignment`
##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -6101,6 +6105,13 @@ private void translateAgg(
}
}
RexNode convertedExpr = bb.convertExpression(operand);
+ if (SqlKind.DEFAULT == convertedExpr.getKind()) {
+ convertedExpr =
+ ((RexCall) convertedExpr)
+ .clone(
+ defaultTypes[index],
+ ((RexCall)
convertedExpr).operands);
+ }
Review Comment:
Nit: update `// ----- FLINK MODIFICATION END -----`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
rexBuilder,
converter);
}
+
+ def extractDefaultTypes(call: SqlCall, validator: SqlValidator):
Array[RelDataType] = {
+ if (!hasAssignment(call)) {
+ return Array.empty
+ }
+
+ call.getOperator.getOperandTypeChecker
+ .asInstanceOf[SqlOperandMetadata]
+ .paramTypes(validator.getTypeFactory)
+ .asScala
+ .toArray
+ }
+
+ def hasAssignment(call: SqlCall): Boolean = {
Review Comment:
Mark as private
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -465,20 +467,38 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
call.getOperands.get(1).asInstanceOf[RexLiteral])
}
+ // replace default node with right type.
+ val operands = new util.ArrayList[RexNode](call.operands)
+
// convert operands and help giving untyped NULL literals a type
- val operands = call.getOperands.zipWithIndex.map {
+ val expressions = call.getOperands.zipWithIndex.map {
// this helps e.g. for AS(null)
// we might need to extend this logic in case some rules do not create
typed NULLs
case (operandLiteral: RexLiteral, 0)
if operandLiteral.getType.getSqlTypeName == SqlTypeName.NULL &&
call.getOperator.getReturnTypeInference == ReturnTypes.ARG0 =>
generateNullLiteral(resultType)
-
+ case (rexCall: RexCall, i)
+ if (rexCall.getKind == SqlKind.DEFAULT && call.getOperator
+ .isInstanceOf[BridgingSqlFunction]) => {
+ val sqlFunction = call.getOperator.asInstanceOf[BridgingSqlFunction]
+ val typeInference = sqlFunction.getTypeInference
+ val typeFactory = sqlFunction.getTypeFactory
+ if (typeInference.getTypedArguments.isPresent) {
+ val dataType =
typeInference.getTypedArguments.get().get(i).getLogicalType
+ operands.set(
+ i,
+
rexCall.clone(typeFactory.createFieldTypeFromLogicalType(dataType),
rexCall.operands))
Review Comment:
The return type of the DEFAULT function has been overridden both in SqlToRel
and here. This seems a bit too scattered. Can we consolidate this to just the
SqlToRel phase?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
rexBuilder,
converter);
}
+
+ def extractDefaultTypes(call: SqlCall, validator: SqlValidator):
Array[RelDataType] = {
Review Comment:
Nit: add some comments on this function.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -101,7 +104,15 @@ public boolean checkOperandTypes(SqlCallBinding
callBinding, boolean throwOnFail
@Override
public SqlOperandCountRange getOperandCountRange() {
- return countRange;
+ if (typeInference.getOptionalArguments().isPresent()
+ && typeInference.getOptionalArguments().get().stream()
+ .anyMatch(Boolean::booleanValue)) {
+ ArgumentCount argumentCount = ConstantArgumentCount.between(0,
countRange.getMax());
Review Comment:
I think the minCount should be the count of `false` in
`typeInference.getOptionalArguments()`, right?
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java:
##########
@@ -1810,4 +1844,30 @@ public String eval(String f1, String f2) {
return "";
}
}
+
+ private static class ArgumentHintScalarFunctionNotNullTypeWithOptionals
extends ScalarFunction {
+ @FunctionHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING NOT NULL"),
+ name = "f1",
+ isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name =
"f2", isOptional = true)
+ })
+ public String eval(String f1, Integer f2) {
Review Comment:
What happens when a parameter that be set as `optional = true` is a
primitive type, such as int? Can we do an early check to make the error message
clearer, instead of potentially encountering strange errors during code
generation or reflection?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java:
##########
@@ -286,6 +305,15 @@ private static void configureTypedArguments(
private static TypeStrategy translateResultTypeStrategy(
Map<FunctionSignatureTemplate, FunctionResultTemplate>
resultMapping) {
+ if (resultMapping.size() == 1) {
Review Comment:
This part of code should not be modified. We should change the logic
`argumentDataTypes#size()` in the constructor `OperatorBindingCallContext`
instead of here.
```
this.argumentDataTypes =
new AbstractList<DataType>() {
@Override
public DataType get(int pos) {
if (binding instanceof SqlCallBinding) {
SqlCallBinding sqlCallBinding = (SqlCallBinding)
binding;
List<SqlNode> operands =
sqlCallBinding.operands();
final RelDataType relDataType =
sqlCallBinding
.getValidator()
.deriveType(
sqlCallBinding.getScope(), operands.get(pos));
final LogicalType logicalType =
FlinkTypeFactory.toLogicalType(relDataType);
return
TypeConversions.fromLogicalToDataType(logicalType);
} else {
final LogicalType logicalType =
toLogicalType(binding.getOperandType(pos));
return fromLogicalToDataType(logicalType);
}
}
@Override
public int size() {
if (binding instanceof SqlCallBinding) {
return ((SqlCallBinding)
binding).operands().size();
} else {
return binding.getOperandCount();
}
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]