This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4cd43fc09bd [FLINK-34058][table] Support optional parameters for named
arguments (#24183)
4cd43fc09bd is described below
commit 4cd43fc09bd6c2e4806792fa2cce71f54ec1a9dd
Author: Feng Jin <[email protected]>
AuthorDate: Mon Jan 29 23:40:07 2024 +0800
[FLINK-34058][table] Support optional parameters for named arguments
(#24183)
---------
Co-authored-by: xuyang <[email protected]>
Co-authored-by: Shengkai <[email protected]>
---
.../types/extraction/BaseMappingExtractor.java | 51 ++-
.../extraction/FunctionSignatureTemplate.java | 30 +-
.../table/types/extraction/FunctionTemplate.java | 15 +-
.../types/extraction/TypeInferenceExtractor.java | 22 +-
.../flink/table/types/inference/TypeInference.java | 26 ++
.../extraction/TypeInferenceExtractorTest.java | 347 ++++++++++++++++++++-
.../apache/calcite/sql2rel/SqlToRelConverter.java | 21 +-
.../planner/calcite/FlinkConvertletTable.java | 39 +++
.../planner/calcite/FlinkOperatorBinding.java | 181 +++++++++++
.../inference/CallBindingCallContext.java | 21 +-
.../inference/OperatorBindingCallContext.java | 31 +-
.../inference/TypeInferenceOperandChecker.java | 41 ++-
.../functions/sql/FlinkSqlOperatorTable.java | 6 +
.../operations/PlannerCallProcedureOperation.java | 4 +-
.../converters/SqlProcedureCallConverter.java | 36 ++-
.../planner/codegen/calls/StringCallGen.scala | 5 +-
.../factories/TestProcedureCatalogFactory.java | 18 ++
.../planner/runtime/stream/sql/FunctionITCase.java | 140 ++++++++-
.../runtime/stream/sql/ProcedureITCase.java | 20 +-
19 files changed, 976 insertions(+), 78 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
index fdd12e1d795..6069a0f729c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -143,6 +144,9 @@ abstract class BaseMappingExtractor {
// check if the method can be called
verifyMappingForMethod(correctMethod,
collectedMappingsPerMethod, verification);
+ // check if we declare optional on a primitive type parameter
+ verifyOptionalOnPrimitiveParameter(correctMethod,
collectedMappingsPerMethod);
+
// check if method strategies conflict with function strategies
collectedMappingsPerMethod.forEach(
(signature, result) -> putMapping(collectedMappings,
signature, result));
@@ -323,6 +327,40 @@ abstract class BaseMappingExtractor {
verification.verify(method, signature.toClass(),
result.toClass()));
}
+ private void verifyOptionalOnPrimitiveParameter(
+ Method method,
+ Map<FunctionSignatureTemplate, FunctionResultTemplate>
collectedMappingsPerMethod) {
+ collectedMappingsPerMethod
+ .keySet()
+ .forEach(
+ signature -> {
+ Boolean[] argumentOptional =
signature.argumentOptionals;
+ // Here we restrict that the argument must contain
optional parameters
+ // in order to obtain the
FunctionSignatureTemplate of the method for
+ // verification. Therefore, the extract method
will only be called once.
+ // If no function hint is set, this verify will
not be executed.
+ if (argumentOptional != null
+ && Arrays.stream(argumentOptional)
+ .anyMatch(Boolean::booleanValue)) {
+ FunctionSignatureTemplate
functionResultTemplate =
+ signatureExtraction.extract(this,
method);
+ for (int i = 0; i < argumentOptional.length;
i++) {
+ DataType dataType =
+
functionResultTemplate.argumentTemplates.get(i)
+ .dataType;
+ if (dataType != null
+ && argumentOptional[i]
+ && dataType.getConversionClass()
!= null
+ &&
dataType.getConversionClass().isPrimitive()) {
+ throw extractionError(
+ "Argument at position %d is
optional but a primitive type doesn't accept null value.",
+ i);
+ }
+ }
+ }
+ });
+ }
+
//
--------------------------------------------------------------------------------------------
// Context sensitive extraction and verification logic
//
--------------------------------------------------------------------------------------------
@@ -338,7 +376,10 @@ abstract class BaseMappingExtractor {
final String[] argumentNames = extractArgumentNames(method,
offset);
- return FunctionSignatureTemplate.of(parameterTypes,
method.isVarArgs(), argumentNames);
+ final Boolean[] argumentOptionals =
extractArgumentOptionals(method, offset);
+
+ return FunctionSignatureTemplate.of(
+ parameterTypes, method.isVarArgs(), argumentNames,
argumentOptionals);
};
}
@@ -416,6 +457,14 @@ abstract class BaseMappingExtractor {
}
}
+ static Boolean[] extractArgumentOptionals(Method method, int offset) {
+ return Arrays.stream(method.getParameters())
+ .skip(offset)
+ .map(parameter -> parameter.getAnnotation(ArgumentHint.class))
+ .map(argumentHint -> argumentHint != null &&
argumentHint.isOptional())
+ .toArray(Boolean[]::new);
+ }
+
protected static ValidationException createMethodNotFoundError(
String methodName, Class<?>[] parameters, @Nullable Class<?>
returnType) {
return extractionError(
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
index 2efbbd0e8f6..855f11e6fa8 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.types.extraction;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
final @Nullable String[] argumentNames;
+ final Boolean[] argumentOptionals;
+
private FunctionSignatureTemplate(
List<FunctionArgumentTemplate> argumentTemplates,
boolean isVarArgs,
- @Nullable String[] argumentNames) {
+ @Nullable String[] argumentNames,
+ Boolean[] argumentOptionals) {
this.argumentTemplates = argumentTemplates;
this.isVarArgs = isVarArgs;
this.argumentNames = argumentNames;
+ this.argumentOptionals = argumentOptionals;
}
static FunctionSignatureTemplate of(
List<FunctionArgumentTemplate> argumentTemplates,
boolean isVarArgs,
- @Nullable String[] argumentNames) {
+ @Nullable String[] argumentNames,
+ Boolean[] argumentOptionals) {
if (argumentNames != null && argumentNames.length !=
argumentTemplates.size()) {
throw extractionError(
"Mismatch between number of argument names '%s' and
argument types '%s'.",
@@ -67,7 +73,25 @@ final class FunctionSignatureTemplate {
throw extractionError(
"Argument name conflict, there are at least two argument
names that are the same.");
}
- return new FunctionSignatureTemplate(argumentTemplates, isVarArgs,
argumentNames);
+ if (argumentOptionals != null && argumentOptionals.length !=
argumentTemplates.size()) {
+ throw extractionError(
+ "Mismatch between number of argument optionals '%s' and
argument types '%s'.",
+ argumentOptionals.length, argumentTemplates.size());
+ }
+ if (argumentOptionals != null) {
+ for (int i = 0; i < argumentTemplates.size(); i++) {
+ DataType dataType = argumentTemplates.get(i).dataType;
+ if (dataType != null
+ && !dataType.getLogicalType().isNullable()
+ && argumentOptionals[i]) {
+ throw extractionError(
+ "Argument at position %s is optional but its type
doesn't accept null value.",
+ i);
+ }
+ }
+ }
+ return new FunctionSignatureTemplate(
+ argumentTemplates, isVarArgs, argumentNames,
argumentOptionals);
}
InputTypeStrategy toInputTypeStrategy() {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
index 80e84b9137a..e18f5991477 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
@@ -194,14 +194,17 @@ final class FunctionTemplate {
"Argument and input hints cannot be declared in the same
function hint.");
}
+ Boolean[] argumentOptionals = null;
if (argumentHints != null) {
argumentHintNames = new String[argumentHints.length];
argumentHintTypes = new DataTypeHint[argumentHints.length];
+ argumentOptionals = new Boolean[argumentHints.length];
boolean allArgumentNameNotSet = true;
for (int i = 0; i < argumentHints.length; i++) {
ArgumentHint argumentHint = argumentHints[i];
argumentHintNames[i] = defaultAsNull(argumentHint,
ArgumentHint::name);
argumentHintTypes[i] = defaultAsNull(argumentHint,
ArgumentHint::type);
+ argumentOptionals[i] = argumentHint.isOptional();
if (argumentHintTypes[i] == null) {
throw extractionError("The type of the argument at
position %d is not set.", i);
}
@@ -216,12 +219,13 @@ final class FunctionTemplate {
argumentHintNames = null;
}
} else {
+ if (inputs == null) {
+ return null;
+ }
argumentHintTypes = inputs;
argumentHintNames = argumentNames;
- }
-
- if (argumentHintTypes == null) {
- return null;
+ argumentOptionals = new Boolean[inputs.length];
+ Arrays.fill(argumentOptionals, false);
}
return FunctionSignatureTemplate.of(
@@ -229,7 +233,8 @@ final class FunctionTemplate {
.map(dataTypeHint ->
createArgumentTemplate(typeFactory, dataTypeHint))
.collect(Collectors.toList()),
isVarArg,
- argumentHintNames);
+ argumentHintNames,
+ argumentOptionals);
}
private static FunctionArgumentTemplate createArgumentTemplate(
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
index b817efa2b84..1b54abd5574 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
@@ -229,6 +229,7 @@ public final class TypeInferenceExtractor {
final TypeInference.Builder builder = TypeInference.newBuilder();
configureNamedArguments(builder, outputMapping);
+ configureOptionalArguments(builder, outputMapping);
configureTypedArguments(builder, outputMapping);
builder.inputTypeStrategy(translateInputTypeStrategy(outputMapping));
@@ -268,6 +269,24 @@ public final class TypeInferenceExtractor {
builder.namedArguments(argumentNames.iterator().next());
}
+ private static void configureOptionalArguments(
+ TypeInference.Builder builder,
+ Map<FunctionSignatureTemplate, FunctionResultTemplate>
outputMapping) {
+ final Set<FunctionSignatureTemplate> signatures =
outputMapping.keySet();
+ if (signatures.stream().anyMatch(s -> s.isVarArgs || s.argumentNames
== null)) {
+ return;
+ }
+ final List<List<Boolean>> argumentOptional =
+ signatures.stream()
+ .filter(s -> s.argumentOptionals != null)
+ .map(s -> Arrays.asList(s.argumentOptionals))
+ .collect(Collectors.toList());
+ if (argumentOptional.size() != 1 || argumentOptional.size() !=
signatures.size()) {
+ return;
+ }
+ builder.optionalArguments(argumentOptional.get(0));
+ }
+
private static void configureTypedArguments(
TypeInference.Builder builder,
Map<FunctionSignatureTemplate, FunctionResultTemplate>
outputMapping) {
@@ -291,7 +310,8 @@ public final class TypeInferenceExtractor {
.collect(
Collectors.toMap(
e -> e.getKey().toInputTypeStrategy(),
- e -> e.getValue().toTypeStrategy()));
+ e -> e.getValue().toTypeStrategy(),
+ (t1, t2) -> t2));
return TypeStrategies.mapping(mappings);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
index 921282b1256..e7bdaf77a93 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
@@ -46,6 +46,8 @@ public final class TypeInference {
private final @Nullable List<String> namedArguments;
+ private final @Nullable List<Boolean> optionalArguments;
+
private final @Nullable List<DataType> typedArguments;
private final InputTypeStrategy inputTypeStrategy;
@@ -56,11 +58,13 @@ public final class TypeInference {
private TypeInference(
@Nullable List<String> namedArguments,
+ @Nullable List<Boolean> optionalArguments,
@Nullable List<DataType> typedArguments,
InputTypeStrategy inputTypeStrategy,
@Nullable TypeStrategy accumulatorTypeStrategy,
TypeStrategy outputTypeStrategy) {
this.namedArguments = namedArguments;
+ this.optionalArguments = optionalArguments;
this.typedArguments = typedArguments;
this.inputTypeStrategy = inputTypeStrategy;
this.accumulatorTypeStrategy = accumulatorTypeStrategy;
@@ -88,6 +92,10 @@ public final class TypeInference {
return Optional.ofNullable(typedArguments);
}
+ public Optional<List<Boolean>> getOptionalArguments() {
+ return Optional.ofNullable(optionalArguments);
+ }
+
public InputTypeStrategy getInputTypeStrategy() {
return inputTypeStrategy;
}
@@ -108,6 +116,8 @@ public final class TypeInference {
private @Nullable List<String> namedArguments;
+ private @Nullable List<Boolean> optionalArguments;
+
private @Nullable List<DataType> typedArguments;
private InputTypeStrategy inputTypeStrategy =
InputTypeStrategies.WILDCARD;
@@ -140,6 +150,21 @@ public final class TypeInference {
return namedArguments(Arrays.asList(argumentNames));
}
+ /**
+ * Sets the list of argument optionals for specifying optional
arguments in the input
+ * signature explicitly.
+ *
+ * <p>This information is useful for SQL's concept of named arguments
using the assignment
+ * operator. The optionals are used to determine whether an argument
is optional or required
+ * in the function call.
+ */
+ public Builder optionalArguments(List<Boolean> optionalArguments) {
+ this.optionalArguments =
+ Preconditions.checkNotNull(
+ optionalArguments, "List of argument optionals
must not be null.");
+ return this;
+ }
+
/**
* Sets the list of argument types for specifying a fixed, not
overloaded, not vararg input
* signature explicitly.
@@ -198,6 +223,7 @@ public final class TypeInference {
public TypeInference build() {
return new TypeInference(
namedArguments,
+ optionalArguments,
typedArguments,
inputTypeStrategy,
accumulatorTypeStrategy,
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
index 483e156636e..dc3b36c9320 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
@@ -573,6 +573,7 @@ class TypeInferenceExtractorTest {
ArgumentHintOnParameterScalarFunction.class)
.expectNamedArguments("in1", "in2")
.expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(false, false)
.expectOutputMapping(
InputTypeStrategies.sequence(
new String[] {"in1", "in2"},
@@ -600,7 +601,8 @@ class TypeInferenceExtractorTest {
"A valid scalar class that declare
FunctionHint for both class and method in the same class.",
ValidFunctionHintOnClassAndMethod.class)
.expectNamedArguments("f1", "f2")
- .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT()),
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(true, true),
TestSpec.forScalarFunction(
"The FunctionHint of the function conflicts
with the method.",
ScalarFunctionWithFunctionHintConflictMethod.class)
@@ -609,7 +611,23 @@ class TypeInferenceExtractorTest {
// For function with overloaded function, argument name will
be empty
TestSpec.forScalarFunction(
"Scalar function with overloaded functions and
arguments hint declared.",
-
ArgumentsHintScalarFunctionWithOverloadedFunction.class));
+
ArgumentsHintScalarFunctionWithOverloadedFunction.class),
+ TestSpec.forScalarFunction(
+ "Scalar function with argument type not null
but optional.",
+
ArgumentHintNotNullTypeWithOptionalsScalarFunction.class)
+ .expectErrorMessage(
+ "Argument at position 0 is optional but its
type doesn't accept null value."),
+ TestSpec.forScalarFunction(
+ "Scalar function with arguments hint and
variable length args",
+ ArgumentHintVariableLengthScalarFunction.class)
+ .expectOutputMapping(
+ InputTypeStrategies.varyingSequence(
+ new String[] {"f1", "f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(DataTypes.STRING())));
}
private static Stream<TestSpec> procedureSpecs() {
@@ -866,7 +884,114 @@ class TypeInferenceExtractorTest {
// no implementation
TestSpec.forProcedure(MissingMethodProcedure.class)
.expectErrorMessage(
- "Could not find a publicly accessible method
named 'call'."));
+ "Could not find a publicly accessible method
named 'call'."),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on method",
+ ArgumentHintOnMethodProcedure.class)
+ .expectNamedArguments("f1", "f2")
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(true, true)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"f1", "f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on class",
+ ArgumentHintOnClassProcedure.class)
+ .expectNamedArguments("f1", "f2")
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(true, true)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"f1", "f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on parameter",
+ ArgumentHintOnParameterProcedure.class)
+ .expectNamedArguments("parameter_f1", "parameter_f2")
+ .expectTypedArguments(
+ DataTypes.STRING(),
DataTypes.INT().bridgedTo(int.class))
+ .expectOptionalArguments(true, false)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"parameter_f1",
"parameter_f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+ InputTypeStrategies.explicit(
+
DataTypes.INT().bridgedTo(int.class))
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on method and parameter",
+
ArgumentHintOnMethodAndParameterProcedure.class)
+ .expectNamedArguments("local_f1", "local_f2")
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(true, true)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"local_f1", "local_f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on class and method",
+ ArgumentHintOnClassAndMethodProcedure.class)
+ .expectNamedArguments("global_f1", "global_f2")
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(false, false)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"global_f1",
"global_f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
on class and method and parameter",
+
ArgumentHintOnClassAndMethodAndParameterProcedure.class)
+ .expectNamedArguments("global_f1", "global_f2")
+ .expectTypedArguments(DataTypes.STRING(),
DataTypes.INT())
+ .expectOptionalArguments(false, false)
+ .expectOutputMapping(
+ InputTypeStrategies.sequence(
+ new String[] {"global_f1",
"global_f2"},
+ new ArgumentTypeStrategy[] {
+
InputTypeStrategies.explicit(DataTypes.STRING()),
+
InputTypeStrategies.explicit(DataTypes.INT())
+ }),
+ TypeStrategies.explicit(
+
DataTypes.INT().notNull().bridgedTo(int.class))),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument hint
type not null but optional",
+ ArgumentHintNotNullWithOptionalProcedure.class)
+ .expectErrorMessage(
+ "Argument at position 1 is optional but its
type doesn't accept null value."),
+ TestSpec.forProcedure(
+ "Named arguments procedure with argument name
conflict",
+ ArgumentHintNameConflictProcedure.class)
+ .expectErrorMessage(
+ "Argument name conflict, there are at least
two argument names that are the same."),
+ TestSpec.forProcedure(
+ "Named arguments procedure with optional type
on primitive type",
+
ArgumentHintOptionalOnPrimitiveParameterConflictProcedure.class)
+ .expectErrorMessage(
+ "Argument at position 1 is optional but a
primitive type doesn't accept null value."));
}
@ParameterizedTest(name = "{index}: {0}")
@@ -881,6 +1006,15 @@ class TypeInferenceExtractorTest {
}
}
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("testData")
+ void testArgumentOptionals(TestSpec testSpec) {
+ if (testSpec.expectedArgumentOptionals != null) {
+
assertThat(testSpec.typeInferenceExtraction.get().getOptionalArguments())
+
.isEqualTo(Optional.of(testSpec.expectedArgumentOptionals));
+ }
+ }
+
@ParameterizedTest(name = "{index}: {0}")
@MethodSource("testData")
void testArgumentTypes(TestSpec testSpec) {
@@ -956,6 +1090,8 @@ class TypeInferenceExtractorTest {
@Nullable List<String> expectedArgumentNames;
+ @Nullable List<Boolean> expectedArgumentOptionals;
+
@Nullable List<DataType> expectedArgumentTypes;
Map<InputTypeStrategy, TypeStrategy> expectedAccumulatorStrategies;
@@ -1045,6 +1181,11 @@ class TypeInferenceExtractorTest {
return this;
}
+ TestSpec expectOptionalArguments(Boolean... expectedArgumentOptionals)
{
+ this.expectedArgumentOptionals =
Arrays.asList(expectedArgumentOptionals);
+ return this;
+ }
+
TestSpec expectTypedArguments(DataType... expectedArgumentTypes) {
this.expectedArgumentTypes = Arrays.asList(expectedArgumentTypes);
return this;
@@ -1625,6 +1766,176 @@ class TypeInferenceExtractorTest {
}
}
+ private static class ArgumentHintOnMethodProcedure implements Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name =
"f2", isOptional = true)
+ })
+ public int[] call(Object procedureContext, String f1, Integer f2) {
+ return null;
+ }
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2",
isOptional = true)
+ })
+ private static class ArgumentHintOnClassProcedure implements Procedure {
+ public int[] call(Object procedureContext, String f1, Integer f2) {
+ return null;
+ }
+ }
+
+ private static class ArgumentHintOnParameterProcedure implements Procedure
{
+ public int[] call(
+ Object procedureContext,
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "parameter_f1",
+ isOptional = true)
+ String f1,
+ @ArgumentHint(
+ type = @DataTypeHint("INT"),
+ name = "parameter_f2",
+ isOptional = false)
+ int f2) {
+ return null;
+ }
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "global_f1",
+ isOptional = false),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "global_f2",
+ isOptional = false)
+ })
+ private static class ArgumentHintOnClassAndMethodProcedure implements
Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "local_f1",
+ isOptional = true),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "local_f2",
+ isOptional = true)
+ })
+ public int[] call(Object procedureContext, String f1, Integer f2) {
+ return null;
+ }
+ }
+
+ private static class ArgumentHintOnMethodAndParameterProcedure implements
Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "local_f1",
+ isOptional = true),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "local_f2",
+ isOptional = true)
+ })
+ public int[] call(
+ Object procedureContext,
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "parameter_f1",
+ isOptional = true)
+ String f1,
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "parameter_f2",
+ isOptional = false)
+ Integer f2) {
+ return null;
+ }
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "global_f1",
+ isOptional = false),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "global_f2",
+ isOptional = false)
+ })
+ private static class ArgumentHintOnClassAndMethodAndParameterProcedure
implements Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "local_f1",
+ isOptional = true),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER"),
+ name = "local_f2",
+ isOptional = true)
+ })
+ public int[] call(
+ Object procedureContext,
+ @ArgumentHint(
+ type = @DataTypeHint("STRING"),
+ name = "parameter_f1",
+ isOptional = false)
+ String f1,
+ Integer f2) {
+ return null;
+ }
+ }
+
+ private static class ArgumentHintNotNullWithOptionalProcedure implements
Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER NOT NULL"),
+ name = "f2",
+ isOptional = true)
+ })
+ public int[] call(Object procedureContext, String f1, Integer f2) {
+ return null;
+ }
+ }
+
+ private static class ArgumentHintNameConflictProcedure implements
Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(
+ type = @DataTypeHint("INTEGER NOT NULL"),
+ name = "f1",
+ isOptional = true)
+ })
+ public int[] call(Object procedureContext, String f1, Integer f2) {
+ return null;
+ }
+ }
+
+ private static class
ArgumentHintOptionalOnPrimitiveParameterConflictProcedure
+ implements Procedure {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name =
"f2", isOptional = true)
+ })
+ public int[] call(Object procedureContext, String f1, int f2) {
+ return null;
+ }
+ }
+
private static class ZeroArgFunctionAsync extends AsyncScalarFunction {
public void eval(CompletableFuture<Integer> f) {}
}
@@ -1761,8 +2072,8 @@ class TypeInferenceExtractorTest {
@FunctionHint(
argument = {
- @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1"),
- @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2")
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1",
isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2",
isOptional = true)
})
private static class ValidFunctionHintOnClassAndMethod extends
ScalarFunction {
@FunctionHint(
@@ -1810,4 +2121,30 @@ class TypeInferenceExtractorTest {
return "";
}
}
+
+ private static class ArgumentHintNotNullTypeWithOptionalsScalarFunction
extends ScalarFunction {
+ @FunctionHint(
+ argument = {
+ @ArgumentHint(
+ type = @DataTypeHint("STRING NOT NULL"),
+ name = "f1",
+ isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name =
"f2", isOptional = true)
+ })
+ public String eval(String f1, Integer f2) {
+ return "";
+ }
+ }
+
+ private static class ArgumentHintVariableLengthScalarFunction extends
ScalarFunction {
+ @FunctionHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1"),
+ @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2")
+ },
+ isVarArgs = true)
+ public String eval(String f1, Integer... f2) {
+ return "";
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index bccced1342b..0c5d7dda150 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -18,6 +18,7 @@ package org.apache.calcite.sql2rel;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
import
org.apache.flink.table.planner.hint.ClearQueryHintsWithInvalidPropagationShuttle;
import org.apache.flink.table.planner.hint.FlinkHints;
@@ -241,7 +242,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* <li>Added in FLINK-32474: Lines 2875 ~ 2887
* <li>Added in FLINK-32474: Lines 2987 ~ 3021
* <li>Added in FLINK-20873: Lines 5519 ~ 5528
- * <li>Added in FLINK-34057: Lines 6089 ~ 6092
+ * <li>Added in FLINK-34057, FLINK-34058: Lines 6090 ~ 6116
* </ol>
*/
@SuppressWarnings("UnstableApiUsage")
@@ -6087,10 +6088,13 @@ public class SqlToRelConverter {
// switch out of agg mode
bb.agg = null;
// ----- FLINK MODIFICATION BEGIN -----
- for (SqlNode operand :
- new SqlCallBinding(validator(),
aggregatingSelectScope, call).operands())
- // ----- FLINK MODIFICATION END -----
- {
+ SqlCallBinding sqlCallBinding =
+ new SqlCallBinding(validator(),
aggregatingSelectScope, call);
+ List<SqlNode> sqlNodes = sqlCallBinding.operands();
+ FlinkOperatorBinding flinkOperatorBinding =
+ new FlinkOperatorBinding(sqlCallBinding);
+ for (int i = 0; i < sqlNodes.size(); i++) {
+ SqlNode operand = sqlNodes.get(i);
// special case for COUNT(*): delete the *
if (operand instanceof SqlIdentifier) {
SqlIdentifier id = (SqlIdentifier) operand;
@@ -6101,8 +6105,15 @@ public class SqlToRelConverter {
}
}
RexNode convertedExpr = bb.convertExpression(operand);
+ if (convertedExpr.getKind() == SqlKind.DEFAULT) {
+ RelDataType relDataType =
flinkOperatorBinding.getOperandType(i);
+ convertedExpr =
+ ((RexCall) convertedExpr)
+ .clone(relDataType, ((RexCall)
convertedExpr).operands);
+ }
args.add(lookupOrCreateGroupExpr(convertedExpr));
}
+ // ----- FLINK MODIFICATION END -----
if (filter != null) {
RexNode convertedExpr = bb.convertExpression(filter);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
index 8f319dd1e23..0a22585ea5c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
@@ -43,8 +44,10 @@ import org.apache.calcite.sql2rel.SqlRexConvertlet;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -69,6 +72,10 @@ public class FlinkConvertletTable implements
SqlRexConvertletTable {
return this::convertSetSemanticsWindowTableFunction;
}
+ if (isContainsDefaultNode(call)) {
+ return this::convertSqlCallWithDefaultNode;
+ }
+
return StandardConvertletTable.INSTANCE.get(call);
}
@@ -113,6 +120,12 @@ public class FlinkConvertletTable implements
SqlRexConvertletTable {
return !operands.isEmpty() && operands.get(0).getKind() ==
SqlKind.SET_SEMANTICS_TABLE;
}
+ private boolean isContainsDefaultNode(SqlCall call) {
+ return call.getOperandList().stream()
+ .filter(Objects::nonNull)
+ .anyMatch(operand -> operand.getKind() == SqlKind.DEFAULT);
+ }
+
/**
* Due to CALCITE-6204, we need to manually extract partition keys and
order keys and convert
* them to {@link RexSetSemanticsTableCall}.
@@ -202,4 +215,30 @@ public class FlinkConvertletTable implements
SqlRexConvertletTable {
// should not happen
throw new TableException("Unsupported partition key with type: " +
e.getKind());
}
+
+ /**
+ * When the SqlCall contains a default operator, the type of the default
node to ANY after
+ * converted to rel node. However, the ANY type cannot pass various checks
well and cannot adapt
+ * well to types in flink. Therefore, we replace the ANY type with the
argument type obtained
+ * from the operator.
+ */
+ private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final
SqlCall call) {
+ RexNode rexCall = StandardConvertletTable.INSTANCE.convertCall(cx,
call);
+ SqlCallBinding sqlCallBinding = new SqlCallBinding(cx.getValidator(),
null, call);
+ FlinkOperatorBinding flinkOperatorBinding = new
FlinkOperatorBinding(sqlCallBinding);
+ if (rexCall instanceof RexCall) {
+ List<RexNode> operands = new ArrayList<>(((RexCall)
rexCall).operands);
+ for (int i = 0; i < operands.size(); i++) {
+ RexNode rexNode = operands.get(i);
+ if (rexNode.getKind() == SqlKind.DEFAULT && rexNode instanceof
RexCall) {
+ RelDataType relDataType =
flinkOperatorBinding.getOperandType(i);
+ operands.set(
+ i,
+ ((RexCall) rexNode).clone(relDataType, ((RexCall)
rexNode).operands));
+ }
+ }
+ return ((RexCall) rexCall).clone(rexCall.getType(), operands);
+ }
+ return rexCall;
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
new file mode 100644
index 00000000000..41e50d8739a
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.runtime.Resources;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The proxy implementation of {@link SqlOperatorBinding} can be used to
correct the operator type
+ * when using named parameter.
+ */
+public class FlinkOperatorBinding extends SqlOperatorBinding {
+
+ private final SqlOperatorBinding sqlOperatorBinding;
+
+ private final List<RelDataType> argumentTypes;
+
+ public FlinkOperatorBinding(SqlOperatorBinding sqlOperatorBinding) {
+ super(sqlOperatorBinding.getTypeFactory(),
sqlOperatorBinding.getOperator());
+ this.sqlOperatorBinding = sqlOperatorBinding;
+ this.argumentTypes = getArgumentTypes();
+ }
+
+ @Override
+ public int getOperandCount() {
+ if (!argumentTypes.isEmpty()) {
+ return argumentTypes.size();
+ } else {
+ return sqlOperatorBinding.getOperandCount();
+ }
+ }
+
+ @Override
+ public RelDataType getOperandType(int ordinal) {
+ if (sqlOperatorBinding instanceof SqlCallBinding) {
+ SqlNode sqlNode = ((SqlCallBinding)
sqlOperatorBinding).operands().get(ordinal);
+ if (sqlNode.getKind() == SqlKind.DEFAULT &&
!argumentTypes.isEmpty()) {
+ return argumentTypes.get(ordinal);
+ } else {
+ return ((SqlCallBinding) sqlOperatorBinding)
+ .getValidator()
+ .deriveType(((SqlCallBinding)
sqlOperatorBinding).getScope(), sqlNode);
+ }
+ } else if (sqlOperatorBinding instanceof RexCallBinding) {
+ RexNode rexNode = ((RexCallBinding)
sqlOperatorBinding).operands().get(ordinal);
+ if (rexNode.getKind() == SqlKind.DEFAULT &&
!argumentTypes.isEmpty()) {
+ return argumentTypes.get(ordinal);
+ } else {
+ return rexNode.getType();
+ }
+ }
+ return sqlOperatorBinding.getOperandType(ordinal);
+ }
+
+ @Override
+ public CalciteException newError(Resources.ExInst<SqlValidatorException>
e) {
+ return sqlOperatorBinding.newError(e);
+ }
+
+ @Override
+ public int getGroupCount() {
+ return sqlOperatorBinding.getGroupCount();
+ }
+
+ @Override
+ public boolean hasFilter() {
+ return sqlOperatorBinding.hasFilter();
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return sqlOperatorBinding.getOperator();
+ }
+
+ @Override
+ public RelDataTypeFactory getTypeFactory() {
+ return sqlOperatorBinding.getTypeFactory();
+ }
+
+ @Override
+ public @Nullable String getStringLiteralOperand(int ordinal) {
+ return sqlOperatorBinding.getStringLiteralOperand(ordinal);
+ }
+
+ @Override // to be removed before 2.0
+ public int getIntLiteralOperand(int ordinal) {
+ return sqlOperatorBinding.getIntLiteralOperand(ordinal);
+ }
+
+ @Override
+ public boolean isOperandNull(int ordinal, boolean allowCast) {
+ return sqlOperatorBinding.isOperandNull(ordinal, allowCast);
+ }
+
+ @Override
+ public boolean isOperandLiteral(int ordinal, boolean allowCast) {
+ return sqlOperatorBinding.isOperandLiteral(ordinal, allowCast);
+ }
+
+ @Override
+ public @Nullable Object getOperandLiteralValue(int ordinal, RelDataType
type) {
+ return sqlOperatorBinding.getOperandLiteralValue(ordinal, type);
+ }
+
+ @Override
+ public @Nullable Comparable getOperandLiteralValue(int ordinal) {
+ return sqlOperatorBinding.getOperandLiteralValue(ordinal);
+ }
+
+ @Override
+ public SqlMonotonicity getOperandMonotonicity(int ordinal) {
+ return sqlOperatorBinding.getOperandMonotonicity(ordinal);
+ }
+
+ @Override
+ public List<RelDataType> collectOperandTypes() {
+ return sqlOperatorBinding.collectOperandTypes();
+ }
+
+ @Override
+ public @Nullable RelDataType getCursorOperand(int ordinal) {
+ return sqlOperatorBinding.getCursorOperand(ordinal);
+ }
+
+ @Override
+ public @Nullable String getColumnListParamInfo(
+ int ordinal, String paramName, List<String> columnList) {
+ return sqlOperatorBinding.getColumnListParamInfo(ordinal, paramName,
columnList);
+ }
+
+ @Override
+ public <T extends Object> @Nullable T getOperandLiteralValue(int ordinal,
Class<T> clazz) {
+ return sqlOperatorBinding.getOperandLiteralValue(ordinal, clazz);
+ }
+
+ private List<RelDataType> getArgumentTypes() {
+ SqlOperandTypeChecker sqlOperandTypeChecker =
getOperator().getOperandTypeChecker();
+ if (sqlOperandTypeChecker != null
+ && sqlOperandTypeChecker.isFixedParameters()
+ && sqlOperandTypeChecker instanceof SqlOperandMetadata) {
+ SqlOperandMetadata sqlOperandMetadata =
+ ((SqlOperandMetadata)
getOperator().getOperandTypeChecker());
+ return sqlOperandMetadata.paramTypes(getTypeFactory());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index cd909ba79db..65aefccad2e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,28 +53,29 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
private final List<DataType> argumentDataTypes;
+ private final SqlOperatorBinding binding;
+
private final @Nullable DataType outputType;
public CallBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
- SqlCallBinding binding,
+ SqlCallBinding sqlCallBinding,
@Nullable RelDataType outputType) {
super(
dataTypeFactory,
definition,
- binding.getOperator().getNameAsId().toString(),
- binding.getGroupCount() > 0);
+ sqlCallBinding.getOperator().getNameAsId().toString(),
+ sqlCallBinding.getGroupCount() > 0);
- this.adaptedArguments = binding.operands(); // reorders the operands
+ this.adaptedArguments = sqlCallBinding.operands(); // reorders the
operands
+ this.binding = new FlinkOperatorBinding(sqlCallBinding);
this.argumentDataTypes =
new AbstractList<DataType>() {
@Override
public DataType get(int pos) {
- final RelDataType relDataType =
- binding.getValidator()
- .deriveType(binding.getScope(),
adaptedArguments.get(pos));
- final LogicalType logicalType =
FlinkTypeFactory.toLogicalType(relDataType);
+ final LogicalType logicalType =
+
FlinkTypeFactory.toLogicalType(binding.getOperandType(pos));
return
TypeConversions.fromLogicalToDataType(logicalType);
}
@@ -81,7 +84,7 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
return binding.getOperandCount();
}
};
- this.outputType = convertOutputType(binding, outputType);
+ this.outputType = convertOutputType(sqlCallBinding, outputType);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index 103d37cc03e..9961354e304 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -21,15 +21,13 @@ package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorBinding;
import javax.annotation.Nullable;
@@ -54,35 +52,22 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
public OperatorBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
- SqlOperatorBinding binding,
+ SqlOperatorBinding sqlOperatorBinding,
RelDataType returnRelDataType) {
super(
dataTypeFactory,
definition,
- binding.getOperator().getNameAsId().toString(),
- binding.getGroupCount() > 0);
+ sqlOperatorBinding.getOperator().getNameAsId().toString(),
+ sqlOperatorBinding.getGroupCount() > 0);
- this.binding = binding;
+ this.binding = new FlinkOperatorBinding(sqlOperatorBinding);
this.argumentDataTypes =
new AbstractList<DataType>() {
@Override
public DataType get(int pos) {
- if (binding instanceof SqlCallBinding) {
- SqlCallBinding sqlCallBinding = (SqlCallBinding)
binding;
- List<SqlNode> operands = sqlCallBinding.operands();
- final RelDataType relDataType =
- sqlCallBinding
- .getValidator()
- .deriveType(
- sqlCallBinding.getScope(),
operands.get(pos));
- final LogicalType logicalType =
-
FlinkTypeFactory.toLogicalType(relDataType);
- return
TypeConversions.fromLogicalToDataType(logicalType);
- } else {
- final LogicalType logicalType =
- toLogicalType(binding.getOperandType(pos));
- return fromLogicalToDataType(logicalType);
- }
+ LogicalType logicalType =
+
FlinkTypeFactory.toLogicalType(binding.getOperandType(pos));
+ return fromLogicalToDataType(logicalType);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
index 5c122ca5038..3d3301d3c48 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.flink.table.types.logical.LogicalType;
@@ -32,6 +34,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperator;
@@ -44,6 +47,7 @@ import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
@@ -101,7 +105,20 @@ public final class TypeInferenceOperandChecker
@Override
public SqlOperandCountRange getOperandCountRange() {
- return countRange;
+ if (typeInference.getOptionalArguments().isPresent()
+ && typeInference.getOptionalArguments().get().stream()
+ .anyMatch(Boolean::booleanValue)) {
+ int notOptionalCount =
+ (int)
+ typeInference.getOptionalArguments().get().stream()
+ .filter(optional -> !optional)
+ .count();
+ ArgumentCount argumentCount =
+ ConstantArgumentCount.between(notOptionalCount,
countRange.getMax());
+ return new ArgumentCountRange(argumentCount);
+ } else {
+ return countRange;
+ }
}
@Override
@@ -116,7 +133,22 @@ public final class TypeInferenceOperandChecker
@Override
public boolean isOptional(int i) {
- return false;
+ Optional<List<Boolean>> optionalArguments =
typeInference.getOptionalArguments();
+ if (optionalArguments.isPresent()) {
+ return optionalArguments.get().get(i);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isFixedParameters() {
+ // This method returns true only if optional arguments are declared
and at least one
+ // optional argument is present.
+ // Otherwise, it defaults to false, bypassing the parameter check in
Calcite.
+ return typeInference.getOptionalArguments().isPresent()
+ && typeInference.getOptionalArguments().get().stream()
+ .anyMatch(Boolean::booleanValue);
}
@Override
@@ -173,6 +205,11 @@ public final class TypeInferenceOperandChecker
final List<SqlNode> operands = callBinding.operands();
for (int i = 0; i < operands.size(); i++) {
final LogicalType expectedType =
expectedDataTypes.get(i).getLogicalType();
+ SqlNode sqlNode = operands.get(i);
+ // skip default node
+ if (sqlNode.getKind() == SqlKind.DEFAULT) {
+ continue;
+ }
final LogicalType argumentType =
toLogicalType(SqlTypeUtil.deriveType(callBinding,
operands.get(i)));
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index c2b3653aa69..f43d1ff80bf 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.SqlMatchRecognize;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
@@ -1286,4 +1287,9 @@ public class FlinkSqlOperatorTable extends
ReflectiveSqlOperatorTable {
.operandTypeChecker(OperandTypes.NILADIC)
.notDeterministic()
.build();
+
+ // DEFAULT FUNCTION
+ // The default operator is used to fill in missing parameters when using
named parameter,
+ // which is used during code generation and not exposed to the user by
default.
+ public static final SqlSpecialOperator DEFAULT =
SqlStdOperatorTable.DEFAULT;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index cd8ad87097b..8638efae8b0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -131,7 +131,9 @@ public class PlannerCallProcedureOperation implements
CallProcedureOperation {
argumentVal[0] = new DefaultProcedureContext(env);
for (int i = 0; i < internalInputArguments.length; i++) {
argumentVal[i + 1] =
- toExternal(internalInputArguments[i], inputTypes[i],
userClassLoader);
+ (internalInputArguments[i] != null)
+ ? toExternal(internalInputArguments[i],
inputTypes[i], userClassLoader)
+ : null;
}
return argumentVal;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
index af73ccfc1e1..8205d52ff7e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
@@ -21,16 +21,17 @@ package
org.apache.flink.table.planner.operations.converters;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure;
import
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation;
import org.apache.flink.table.planner.plan.utils.RexLiteralUtil;
-import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
import org.apache.flink.table.procedures.ProcedureDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.ExplicitOperatorBinding;
@@ -47,6 +48,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter.toRelDataType;
+
/**
* A converter for call procedure node. The call procedure statement will be
parsed to a SqlCall
* wrapping SqlProcedureCallOperator as the operator by calcite. So, this
converter will try to
@@ -70,12 +73,14 @@ public class SqlProcedureCallConverter implements
SqlNodeConverter<SqlNode> {
SqlCallBinding sqlCallBinding =
new SqlCallBinding(context.getSqlValidator(), null,
callProcedure);
+
+ List<RexNode> reducedOperands = reduceOperands(sqlCallBinding,
context);
SqlOperatorBinding sqlOperatorBinding =
new ExplicitOperatorBinding(
context.getSqlValidator().getTypeFactory(),
sqlProcedure,
- sqlCallBinding.operands().stream()
- .map(sqlValidator::getValidatedNodeType)
+ reducedOperands.stream()
+ .map(RexNode::getType)
.collect(Collectors.toList()));
OperatorBindingCallContext bindingCallContext =
@@ -93,8 +98,6 @@ public class SqlProcedureCallConverter implements
SqlNodeConverter<SqlNode> {
context.getCatalogManager().getDataTypeFactory()),
bindingCallContext,
null);
-
- List<RexNode> reducedOperands =
reduceOperands(sqlCallBinding.operands(), context);
List<DataType> argumentTypes =
typeInferResult.getExpectedArgumentTypes();
int argumentCount = argumentTypes.size();
DataType[] inputTypes = new DataType[argumentCount];
@@ -126,19 +129,30 @@ public class SqlProcedureCallConverter implements
SqlNodeConverter<SqlNode> {
typeInferResult.getOutputDataType());
}
- private List<RexNode> reduceOperands(List<SqlNode> sqlNodes,
ConvertContext context) {
+ private List<RexNode> reduceOperands(SqlCallBinding sqlCallBinding,
ConvertContext context) {
// we don't really care about the input row type while converting to
RexNode
// since call procedure shouldn't refer any inputs.
// so, construct an empty row for it.
RelDataType inputRowType =
- LogicalRelDataTypeConverter.toRelDataType(
+ toRelDataType(
DataTypes.ROW().getLogicalType(),
context.getSqlValidator().getTypeFactory());
List<RexNode> rexNodes = new ArrayList<>();
- for (SqlNode sqlNode : sqlNodes) {
- RexNode rexNode = context.toRexNode(sqlNode, inputRowType, null);
- rexNodes.add(rexNode);
+ List<SqlNode> operands = sqlCallBinding.operands();
+ FlinkOperatorBinding flinkOperatorBinding = new
FlinkOperatorBinding(sqlCallBinding);
+ for (int i = 0; i < operands.size(); i++) {
+ RexNode rexNode = context.toRexNode(operands.get(i), inputRowType,
null);
+ if (rexNode.getKind() == SqlKind.DEFAULT) {
+ rexNodes.add(
+ ((RexCall) rexNode)
+ .clone(
+ flinkOperatorBinding.getOperandType(i),
+ ((RexCall) rexNode).operands));
+ } else {
+ rexNodes.add(rexNode);
+ }
}
- return context.reduceRexNodes(rexNodes);
+ rexNodes = context.reduceRexNodes(rexNodes);
+ return rexNodes;
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index a589a495dee..9352a8388ef 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.data.util.DataFormatConverters
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
GeneratedExpression}
import org.apache.flink.table.planner.codegen.CodeGenUtils._
-import
org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull,
generateCallIfArgsNullable, generateNonNullField,
generateStringResultCallIfArgsNotNull}
+import
org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull,
generateCallIfArgsNullable, generateNonNullField, generateNullLiteral,
generateStringResultCallIfArgsNotNull}
import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
import org.apache.flink.table.runtime.functions.SqlFunctionUtils
@@ -237,6 +237,9 @@ object StringCallGen {
val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
generateNonNullField(returnType, currentDatabase)
+ case DEFAULT =>
+ generateNullLiteral(returnType)
+
case _ => null
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 71190ffc512..126c9643ea5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.factories;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.catalog.Catalog;
@@ -90,6 +91,9 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
PROCEDURE_MAP.put(
ObjectPath.fromString("system.named_args_overload"),
new NamedArgumentsProcedureWithOverload());
+ PROCEDURE_MAP.put(
+ ObjectPath.fromString("system.named_args_optional"),
+ new NamedArgumentsProcedureWithOptionalArguments());
}
public CatalogWithBuiltInProcedure(String name) {
@@ -212,6 +216,20 @@ public class TestProcedureCatalogFactory implements
CatalogFactory {
}
}
+ /** A procedure with named arguments and optional arguments. */
+ public static class NamedArgumentsProcedureWithOptionalArguments
implements Procedure {
+
+ @ProcedureHint(
+ output = @DataTypeHint("STRING"),
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name = "c",
isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("INT"), name = "d",
isOptional = true)
+ })
+ public String[] call(ProcedureContext procedureContext, String arg1,
Integer arg2) {
+ return new String[] {arg1 + ", " + arg2};
+ }
+ }
+
/** A simple pojo class for testing purpose. */
public static class UserPojo {
private final String name;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 9ace5dff76a..f0623717681 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -1074,6 +1074,25 @@ public class FunctionITCase extends StreamingTestBase {
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
}
+ @Test
+ void testNamedArgumentsTableFunctionWithOptionalArguments() throws
Exception {
+ final Row[] sinkData = new Row[] {Row.of("null, str2")};
+
+ TestCollectionTableFactory.reset();
+
+ tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector'
= 'COLLECTION')");
+
+ tEnv().createFunction(
+ "NamedArgumentsTableFunctionWithOptionalArguments",
+
NamedArgumentsTableFunctionWithOptionalArguments.class);
+ tEnv().executeSql(
+ "INSERT INTO SinkTable "
+ + "SELECT T1.s FROM
TABLE(NamedArgumentsTableFunctionWithOptionalArguments(in2 => 'str2')) AS
T1(s)")
+ .await();
+
+
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
+ }
+
@Test
void testNamedArgumentsScalarFunction() throws Exception {
final List<Row> sourceData =
@@ -1124,6 +1143,29 @@ public class FunctionITCase extends StreamingTestBase {
"SQL validation failed. Could not find the argument
names. Currently named arguments are not supported for varArgs and multi
different argument names with overload function");
}
+ @Test
+ void testNamedArgumentsScalarFunctionWithOptionalArguments() throws
Exception {
+ final List<Row> sinkData =
+ Arrays.asList(Row.of("s1: null", "null: s2", "s1: s2", "null:
null"));
+ TestCollectionTableFactory.reset();
+
+ tEnv().executeSql(
+ "CREATE TABLE TestTable(s1 STRING, s2 STRING, s3
STRING, s4 STRING) WITH ('connector' = 'COLLECTION')");
+
+ tEnv().createTemporarySystemFunction(
+ "NamedArgumentsScalarFunctionWithOptionalArguments",
+
NamedArgumentsScalarFunctionWithOptionalArguments.class);
+ tEnv().executeSql(
+ "INSERT INTO TestTable SELECT"
+ + "
NamedArgumentsScalarFunctionWithOptionalArguments(in1 => 's1') as s1,"
+ + "
NamedArgumentsScalarFunctionWithOptionalArguments(in2 => 's2') as s2,"
+ + "
NamedArgumentsScalarFunctionWithOptionalArguments(in1 => 's1', in2 => 's2') as
s3,"
+ + "
NamedArgumentsScalarFunctionWithOptionalArguments() as s4")
+ .await();
+
+ assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
+ }
+
@Test
void testNamedArgumentAggregateFunction() throws Exception {
final List<Row> sourceData =
@@ -1133,7 +1175,8 @@ public class FunctionITCase extends StreamingTestBase {
Row.of(LocalDateTime.parse("2007-12-03T10:15:32"),
"e", "f", 5, 6),
Row.of(LocalDateTime.parse("2007-12-03T10:15:32"),
"gg", "hh", 7, 88));
- final List<Row> sinkData = Arrays.asList(Row.of("a:b", "b:a"),
Row.of("gg:hh", "hh:gg"));
+ final List<Row> sinkData =
+ Arrays.asList(Row.of("a: b", "b: a"), Row.of("gg: hh", "hh:
gg"));
TestCollectionTableFactory.reset();
TestCollectionTableFactory.initData(sourceData);
@@ -1158,6 +1201,42 @@ public class FunctionITCase extends StreamingTestBase {
assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
}
+ @Test
+ void testNamedArgumentAggregateFunctionWithOptionalArguments() throws
Exception {
+ final List<Row> sourceData =
+ Arrays.asList(
+ Row.of(LocalDateTime.parse("2007-12-03T10:15:30"),
"a", "b", 1, 2),
+ Row.of(LocalDateTime.parse("2007-12-03T10:15:30"),
"c", "d", 33, 44),
+ Row.of(LocalDateTime.parse("2007-12-03T10:15:32"),
"e", "f", 5, 6),
+ Row.of(LocalDateTime.parse("2007-12-03T10:15:32"),
"gg", "hh", 7, 88));
+
+ final List<Row> sinkData =
+ Arrays.asList(Row.of("a: null", "null: b"), Row.of("gg: null",
"null: hh"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ tEnv().executeSql(
+ "CREATE TABLE SourceTable(ts TIMESTAMP(3), s1 STRING,
s2 STRING, i1 INT, i2 INT, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) "
+ + "WITH ('connector' = 'COLLECTION')");
+ tEnv().executeSql(
+ "CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH
('connector' = 'COLLECTION')");
+
+ tEnv().createTemporarySystemFunction(
+ "NamedArgumentAggregateFunctionWithOptionalArguments",
+
NamedArgumentAggregateFunctionWithOptionalArguments.class);
+
+ tEnv().executeSql(
+ "INSERT INTO SinkTable "
+ + "SELECT
NamedArgumentAggregateFunctionWithOptionalArguments(in1 => s1), "
+ +
"NamedArgumentAggregateFunctionWithOptionalArguments(in2 => s2) "
+ + "FROM SourceTable "
+ + "GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)")
+ .await();
+
+ assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
+ }
+
@Test
public void testInvalidUseOfScalarFunction() {
tEnv().executeSql(
@@ -1527,6 +1606,19 @@ public class FunctionITCase extends StreamingTestBase {
@ArgumentHint(name = "in1", type =
@DataTypeHint("string")),
@ArgumentHint(name = "in2", type = @DataTypeHint("string"))
})
+ public String eval(String arg1, String arg2) {
+ return (arg1 + ":" + arg2);
+ }
+ }
+
+ /** Function with optional arguments. */
+ public static class NamedArgumentsScalarFunctionWithOptionalArguments
extends ScalarFunction {
+ @FunctionHint(
+ output = @DataTypeHint("STRING"),
+ argument = {
+ @ArgumentHint(name = "in1", type =
@DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(name = "in2", type =
@DataTypeHint("STRING"), isOptional = true)
+ })
public String eval(String arg1, String arg2) {
return (arg1 + ": " + arg2);
}
@@ -1659,6 +1751,20 @@ public class FunctionITCase extends StreamingTestBase {
}
}
+ /** Function that returns a string or integer. */
+ public static class NamedArgumentsTableFunctionWithOptionalArguments
+ extends TableFunction<Object> {
+ @FunctionHint(
+ argument = {
+ @ArgumentHint(type = @DataTypeHint("STRING"), name =
"in1", isOptional = true),
+ @ArgumentHint(type = @DataTypeHint("STRING"), name =
"in2", isOptional = true)
+ },
+ output = @DataTypeHint("STRING"))
+ public void eval(String arg1, String arg2) {
+ collect(arg1 + ", " + arg2);
+ }
+ }
+
/**
* Function that returns which method has been called.
*
@@ -1790,7 +1896,37 @@ public class FunctionITCase extends StreamingTestBase {
if (arg1 == null || arg2 == null) {
return;
}
- String value = arg1 + ":" + arg2;
+ String value = arg1 + ": " + arg2;
+ final String longestString = (String) acc.getField(0);
+ if (longestString == null || longestString.length() <
value.length()) {
+ acc.setField(0, value);
+ }
+ }
+
+ @Override
+ public String getValue(Row acc) {
+ return (String) acc.getField(0);
+ }
+ }
+
+ /** Function that aggregates strings and finds the longest string. */
+ public static class NamedArgumentAggregateFunctionWithOptionalArguments
+ extends AggregateFunction<String, Row> {
+
+ @Override
+ public Row createAccumulator() {
+ return Row.of((String) null);
+ }
+
+ @FunctionHint(
+ output = @DataTypeHint("STRING"),
+ argument = {
+ @ArgumentHint(name = "in1", type =
@DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(name = "in2", type =
@DataTypeHint("STRING"), isOptional = true)
+ },
+ accumulator = @DataTypeHint("ROW<longestString STRING>"))
+ public void accumulate(Row acc, String arg1, String arg2) {
+ String value = arg1 + ": " + arg2;
final String longestString = (String) acc.getField(0);
if (longestString == null || longestString.length() <
value.length()) {
acc.setField(0, value);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index fb6a0f1dae5..dc5e2319b4d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -88,7 +88,7 @@ class ProcedureITCase extends StreamingTestBase {
tEnv().executeSql("show procedures in
`system`").collect());
assertThat(rows.toString())
.isEqualTo(
- "[+I[generate_n], +I[generate_user], +I[get_year],
+I[named_args], +I[named_args_overload], +I[sum_n]]");
+ "[+I[generate_n], +I[generate_user], +I[get_year],
+I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
// show procedure with like
rows =
@@ -115,7 +115,8 @@ class ProcedureITCase extends StreamingTestBase {
tEnv().executeSql("show procedures in `system` not
like 'generate%'")
.collect());
assertThat(rows.toString())
- .isEqualTo("[+I[get_year], +I[named_args],
+I[named_args_overload], +I[sum_n]]");
+ .isEqualTo(
+ "[+I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
// show procedure with not ilike
rows =
@@ -123,7 +124,8 @@ class ProcedureITCase extends StreamingTestBase {
tEnv().executeSql("show procedures in `system` not
ilike 'generaTe%'")
.collect());
assertThat(rows.toString())
- .isEqualTo("[+I[get_year], +I[named_args],
+I[named_args_overload], +I[sum_n]]");
+ .isEqualTo(
+ "[+I[get_year], +I[named_args],
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
}
@Test
@@ -200,12 +202,12 @@ class ProcedureITCase extends StreamingTestBase {
}
@Test
- void testNamedArgumentsWithDefaultValue() {
- // default value
- Assertions.assertThatThrownBy(
- () -> tEnv().executeSql("call `system`.named_args(c =>
'yuxia')"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining("No match found for function signature
named_args");
+ void testNamedArgumentsWithOptionalArguments() {
+ TableResult tableResult = tEnv().executeSql("call
`system`.named_args_optional(d => 19)");
+ verifyTableResult(
+ tableResult,
+ Collections.singletonList(Row.of("null, 19")),
+ ResolvedSchema.of(Column.physical("result",
DataTypes.STRING())));
}
private void verifyTableResult(