dawidwys commented on code in PR #25805:
URL: https://github.com/apache/flink/pull/25805#discussion_r1890115731
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
protected final DataTypeFactory typeFactory;
- private final String methodName;
+ protected final String methodName;
private final SignatureExtraction signatureExtraction;
protected final ResultExtraction outputExtraction;
- protected final MethodVerification verification;
+ protected final MethodVerification outputVerification;
public BaseMappingExtractor(
DataTypeFactory typeFactory,
String methodName,
SignatureExtraction signatureExtraction,
ResultExtraction outputExtraction,
- MethodVerification verification) {
+ MethodVerification outputVerification) {
this.typeFactory = typeFactory;
this.methodName = methodName;
this.signatureExtraction = signatureExtraction;
this.outputExtraction = outputExtraction;
- this.verification = verification;
+ this.outputVerification = outputVerification;
+ }
+
+ Map<FunctionSignatureTemplate, FunctionOutputTemplate>
extractOutputMapping() {
+ try {
+ return extractResultMappings(
+ outputExtraction, FunctionTemplate::getOutputTemplate,
outputVerification);
+ } catch (Throwable t) {
+ throw extractionError(t, "Error in extracting a signature to
output mapping.");
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Extraction strategies
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Extraction that uses the method parameters for producing a {@link
FunctionSignatureTemplate}.
+ */
+ static SignatureExtraction createArgumentsFromParametersExtraction(
+ int offset, @Nullable Class<?> contextClass) {
+ return (extractor, method) -> {
+ final List<ArgumentParameter> args =
+ extractArgumentParameters(method, offset, contextClass);
+
+ final EnumSet<StaticArgumentTrait>[] argumentTraits =
extractArgumentTraits(args);
+
+ final List<FunctionArgumentTemplate> argumentTemplates =
+ extractArgumentTemplates(
+ extractor.typeFactory,
extractor.getFunctionClass(), args);
+
+ final String[] argumentNames = extractArgumentNames(method, args);
+
+ final boolean[] argumentOptionals = extractArgumentOptionals(args);
+
+ return FunctionSignatureTemplate.of(
+ argumentTemplates,
+ method.isVarArgs(),
+ argumentTraits,
+ argumentNames,
+ argumentOptionals);
+ };
+ }
+
+ /**
+ * Extraction that uses the method parameters for producing a {@link
FunctionSignatureTemplate}.
+ */
+ static SignatureExtraction createArgumentsFromParametersExtraction(int
offset) {
+ return createArgumentsFromParametersExtraction(offset, null);
+ }
+
+ /** Extraction that uses the method parameters with {@link StateHint} for
state entries. */
+ static ResultExtraction createStateFromParametersExtraction() {
+ return (extractor, method) -> {
+ final List<StateParameter> stateParameters =
extractStateParameters(method);
+ return createStateTemplateFromParameters(extractor, method,
stateParameters);
+ };
+ }
+
+ /**
+ * Extraction that uses a generic type variable for producing a {@link
FunctionStateTemplate}.
+ * Or method parameters with {@link StateHint} for state entries as a
fallback.
+ */
+ static ResultExtraction createStateFromGenericInClassOrParameters(
Review Comment:
```suggestion
static ResultExtraction
createStateFromGenericInClassOrParametersExtraction(
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -320,9 +322,12 @@ public static void validateClassForRuntime(
methods.stream()
.anyMatch(
method ->
- ExtractionUtils.isInvokable(method,
argumentClasses)
+ ExtractionUtils.isInvokable(false,
method, argumentClasses)
&&
ExtractionUtils.isAssignable(
- outputClass,
method.getReturnType(), true));
+ outputClass,
+ method.getReturnType(),
+ true,
+ false));
Review Comment:
Why not `true` here?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -185,101 +154,166 @@ static ResultExtraction
createGenericResultExtractionFromMethod(
method,
paramPos,
genericPos);
- return FunctionResultTemplate.of(dataType);
+ return FunctionResultTemplate.ofOutput(dataType);
};
}
- /** Uses hints to extract functional template. */
- private static Optional<FunctionResultTemplate> extractHints(
- BaseMappingExtractor extractor, Method method) {
- final Set<DataTypeHint> dataTypeHints = new HashSet<>();
- dataTypeHints.addAll(collectAnnotationsOfMethod(DataTypeHint.class,
method));
- dataTypeHints.addAll(
- collectAnnotationsOfClass(DataTypeHint.class,
extractor.getFunctionClass()));
- if (dataTypeHints.size() > 1) {
- throw extractionError(
- "More than one data type hint found for output of
function. "
- + "Please use a function hint instead.");
- }
- if (dataTypeHints.size() == 1) {
- return Optional.ofNullable(
- FunctionTemplate.createResultTemplate(
- extractor.typeFactory,
dataTypeHints.iterator().next()));
- }
- // otherwise continue with regular extraction
- return Optional.empty();
- }
+ //
--------------------------------------------------------------------------------------------
+ // Verification strategies
+ //
--------------------------------------------------------------------------------------------
- /** Verification that checks a method by parameters and return type. */
+ /** Verification that checks a method by parameters (arguments only) and
return type. */
static MethodVerification createParameterAndReturnTypeVerification() {
- return (method, signature, result) -> {
- final Class<?>[] parameters = signature.toArray(new Class[0]);
+ return (method, state, arguments, result) -> {
+ checkNoState(state);
+ final Class<?>[] parameters = assembleParameters(state, arguments);
final Class<?> returnType = method.getReturnType();
+ // TODO enable strict autoboxing
Review Comment:
Can we fix this TODO?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java:
##########
@@ -936,10 +947,11 @@ public void visitLocalVariable(
* @param cls the Class to check, may be null
* @param toClass the Class to try to assign into, returns false if null
* @param autoboxing whether to use implicit autoboxing/unboxing between
primitives and wrappers
+ * @param strictAutoboxing checks whether null would end up in a primitive
type and forbids it
* @return {@code true} if assignment possible
*/
public static boolean isAssignable(
- Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
+ Class<?> cls, final Class<?> toClass, boolean autoboxing, boolean
strictAutoboxing) {
Review Comment:
Can we be consistent with `final`? I believe both `autoboxing` and
`strictAutoboxing` can be `final`
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java:
##########
@@ -171,12 +171,13 @@ private Object callProcedure(Procedure procedure,
Class<?>[] inputClz, Object[]
methods.stream()
.filter(
method ->
- ExtractionUtils.isInvokable(method,
inputClz)
+ ExtractionUtils.isInvokable(false,
method, inputClz)
Review Comment:
why `false`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
protected final DataTypeFactory typeFactory;
- private final String methodName;
+ protected final String methodName;
private final SignatureExtraction signatureExtraction;
protected final ResultExtraction outputExtraction;
- protected final MethodVerification verification;
+ protected final MethodVerification outputVerification;
public BaseMappingExtractor(
DataTypeFactory typeFactory,
String methodName,
SignatureExtraction signatureExtraction,
ResultExtraction outputExtraction,
- MethodVerification verification) {
+ MethodVerification outputVerification) {
this.typeFactory = typeFactory;
this.methodName = methodName;
this.signatureExtraction = signatureExtraction;
this.outputExtraction = outputExtraction;
- this.verification = verification;
+ this.outputVerification = outputVerification;
+ }
+
+ Map<FunctionSignatureTemplate, FunctionOutputTemplate>
extractOutputMapping() {
+ try {
+ return extractResultMappings(
+ outputExtraction, FunctionTemplate::getOutputTemplate,
outputVerification);
+ } catch (Throwable t) {
+ throw extractionError(t, "Error in extracting a signature to
output mapping.");
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Extraction strategies
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Extraction that uses the method parameters for producing a {@link
FunctionSignatureTemplate}.
+ */
+ static SignatureExtraction createArgumentsFromParametersExtraction(
+ int offset, @Nullable Class<?> contextClass) {
+ return (extractor, method) -> {
+ final List<ArgumentParameter> args =
+ extractArgumentParameters(method, offset, contextClass);
+
+ final EnumSet<StaticArgumentTrait>[] argumentTraits =
extractArgumentTraits(args);
+
+ final List<FunctionArgumentTemplate> argumentTemplates =
+ extractArgumentTemplates(
+ extractor.typeFactory,
extractor.getFunctionClass(), args);
+
+ final String[] argumentNames = extractArgumentNames(method, args);
+
+ final boolean[] argumentOptionals = extractArgumentOptionals(args);
+
+ return FunctionSignatureTemplate.of(
+ argumentTemplates,
+ method.isVarArgs(),
+ argumentTraits,
+ argumentNames,
+ argumentOptionals);
+ };
+ }
+
+ /**
+ * Extraction that uses the method parameters for producing a {@link
FunctionSignatureTemplate}.
+ */
+ static SignatureExtraction createArgumentsFromParametersExtraction(int
offset) {
Review Comment:
Could you add description for `offset`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
protected final DataTypeFactory typeFactory;
- private final String methodName;
+ protected final String methodName;
private final SignatureExtraction signatureExtraction;
protected final ResultExtraction outputExtraction;
- protected final MethodVerification verification;
+ protected final MethodVerification outputVerification;
public BaseMappingExtractor(
DataTypeFactory typeFactory,
String methodName,
SignatureExtraction signatureExtraction,
ResultExtraction outputExtraction,
- MethodVerification verification) {
+ MethodVerification outputVerification) {
this.typeFactory = typeFactory;
this.methodName = methodName;
this.signatureExtraction = signatureExtraction;
this.outputExtraction = outputExtraction;
- this.verification = verification;
+ this.outputVerification = outputVerification;
+ }
+
+ Map<FunctionSignatureTemplate, FunctionOutputTemplate>
extractOutputMapping() {
+ try {
+ return extractResultMappings(
+ outputExtraction, FunctionTemplate::getOutputTemplate,
outputVerification);
+ } catch (Throwable t) {
+ throw extractionError(t, "Error in extracting a signature to
output mapping.");
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Extraction strategies
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Extraction that uses the method parameters for producing a {@link
FunctionSignatureTemplate}.
+ */
+ static SignatureExtraction createArgumentsFromParametersExtraction(
+ int offset, @Nullable Class<?> contextClass) {
Review Comment:
could you add a description what is the offset?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -185,101 +154,166 @@ static ResultExtraction
createGenericResultExtractionFromMethod(
method,
paramPos,
genericPos);
- return FunctionResultTemplate.of(dataType);
+ return FunctionResultTemplate.ofOutput(dataType);
};
}
- /** Uses hints to extract functional template. */
- private static Optional<FunctionResultTemplate> extractHints(
- BaseMappingExtractor extractor, Method method) {
- final Set<DataTypeHint> dataTypeHints = new HashSet<>();
- dataTypeHints.addAll(collectAnnotationsOfMethod(DataTypeHint.class,
method));
- dataTypeHints.addAll(
- collectAnnotationsOfClass(DataTypeHint.class,
extractor.getFunctionClass()));
- if (dataTypeHints.size() > 1) {
- throw extractionError(
- "More than one data type hint found for output of
function. "
- + "Please use a function hint instead.");
- }
- if (dataTypeHints.size() == 1) {
- return Optional.ofNullable(
- FunctionTemplate.createResultTemplate(
- extractor.typeFactory,
dataTypeHints.iterator().next()));
- }
- // otherwise continue with regular extraction
- return Optional.empty();
- }
+ //
--------------------------------------------------------------------------------------------
+ // Verification strategies
+ //
--------------------------------------------------------------------------------------------
- /** Verification that checks a method by parameters and return type. */
+ /** Verification that checks a method by parameters (arguments only) and
return type. */
static MethodVerification createParameterAndReturnTypeVerification() {
- return (method, signature, result) -> {
- final Class<?>[] parameters = signature.toArray(new Class[0]);
+ return (method, state, arguments, result) -> {
+ checkNoState(state);
+ final Class<?>[] parameters = assembleParameters(state, arguments);
final Class<?> returnType = method.getReturnType();
+ // TODO enable strict autoboxing
final boolean isValid =
- isInvokable(method, parameters) && isAssignable(result,
returnType, true);
+ isInvokable(false, method, parameters)
+ && isAssignable(result, returnType, true, false);
if (!isValid) {
- throw createMethodNotFoundError(method.getName(), parameters,
result);
+ throw createMethodNotFoundError(method.getName(), parameters,
result, "");
}
};
}
- /** Verification that checks a method by parameters including an
accumulator. */
- static MethodVerification createParameterWithAccumulatorVerification() {
- return (method, signature, result) -> {
- if (result != null) {
- // ignore the accumulator in the first argument
- createParameterWithArgumentVerification(null).verify(method,
signature, result);
+ /** Verification that checks a method by parameters (arguments only or
with accumulator). */
+ static MethodVerification createParameterVerification(boolean
requireAccumulator) {
+ return (method, state, arguments, result) -> {
+ if (requireAccumulator) {
+ checkSingleState(state);
} else {
- // check the signature only
- createParameterVerification().verify(method, signature, null);
+ checkNoState(state);
}
- };
- }
-
- /** Verification that checks a method by parameters including an
additional first parameter. */
- static MethodVerification createParameterWithArgumentVerification(
- @Nullable Class<?> argumentClass) {
- return (method, signature, result) -> {
- final Class<?>[] parameters =
- Stream.concat(Stream.of(argumentClass), signature.stream())
- .toArray(Class<?>[]::new);
- if (!isInvokable(method, parameters)) {
- throw createMethodNotFoundError(method.getName(), parameters,
null);
+ final Class<?>[] parameters = assembleParameters(state, arguments);
+ // TODO enable strict autoboxing
Review Comment:
Can we fix this?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/CountAggFunction.scala:
##########
@@ -42,7 +42,7 @@ class CountAggFunction extends AggregateFunction[JLong,
CountAccumulator] {
acc.f0 += 1L
}
- def retract(acc: CountAccumulator, @DataTypeHint("INT") value: Any): Unit = {
+ def retract(acc: CountAccumulator, value: Any): Unit = {
Review Comment:
Is this change mandatory or is the annotation just no longer necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]