dawidwys commented on code in PR #25805:
URL: https://github.com/apache/flink/pull/25805#discussion_r1890115731


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
 
     protected final DataTypeFactory typeFactory;
 
-    private final String methodName;
+    protected final String methodName;
 
     private final SignatureExtraction signatureExtraction;
 
     protected final ResultExtraction outputExtraction;
 
-    protected final MethodVerification verification;
+    protected final MethodVerification outputVerification;
 
     public BaseMappingExtractor(
             DataTypeFactory typeFactory,
             String methodName,
             SignatureExtraction signatureExtraction,
             ResultExtraction outputExtraction,
-            MethodVerification verification) {
+            MethodVerification outputVerification) {
         this.typeFactory = typeFactory;
         this.methodName = methodName;
         this.signatureExtraction = signatureExtraction;
         this.outputExtraction = outputExtraction;
-        this.verification = verification;
+        this.outputVerification = outputVerification;
+    }
+
+    Map<FunctionSignatureTemplate, FunctionOutputTemplate> 
extractOutputMapping() {
+        try {
+            return extractResultMappings(
+                    outputExtraction, FunctionTemplate::getOutputTemplate, 
outputVerification);
+        } catch (Throwable t) {
+            throw extractionError(t, "Error in extracting a signature to 
output mapping.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Extraction strategies
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+     */
+    static SignatureExtraction createArgumentsFromParametersExtraction(
+            int offset, @Nullable Class<?> contextClass) {
+        return (extractor, method) -> {
+            final List<ArgumentParameter> args =
+                    extractArgumentParameters(method, offset, contextClass);
+
+            final EnumSet<StaticArgumentTrait>[] argumentTraits = 
extractArgumentTraits(args);
+
+            final List<FunctionArgumentTemplate> argumentTemplates =
+                    extractArgumentTemplates(
+                            extractor.typeFactory, 
extractor.getFunctionClass(), args);
+
+            final String[] argumentNames = extractArgumentNames(method, args);
+
+            final boolean[] argumentOptionals = extractArgumentOptionals(args);
+
+            return FunctionSignatureTemplate.of(
+                    argumentTemplates,
+                    method.isVarArgs(),
+                    argumentTraits,
+                    argumentNames,
+                    argumentOptionals);
+        };
+    }
+
+    /**
+     * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+     */
+    static SignatureExtraction createArgumentsFromParametersExtraction(int 
offset) {
+        return createArgumentsFromParametersExtraction(offset, null);
+    }
+
+    /** Extraction that uses the method parameters with {@link StateHint} for 
state entries. */
+    static ResultExtraction createStateFromParametersExtraction() {
+        return (extractor, method) -> {
+            final List<StateParameter> stateParameters = 
extractStateParameters(method);
+            return createStateTemplateFromParameters(extractor, method, 
stateParameters);
+        };
+    }
+
+    /**
+     * Extraction that uses a generic type variable for producing a {@link 
FunctionStateTemplate}.
+     * Or method parameters with {@link StateHint} for state entries as a 
fallback.
+     */
+    static ResultExtraction createStateFromGenericInClassOrParameters(

Review Comment:
   ```suggestion
       static ResultExtraction 
createStateFromGenericInClassOrParametersExtraction(
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -320,9 +322,12 @@ public static void validateClassForRuntime(
                 methods.stream()
                         .anyMatch(
                                 method ->
-                                        ExtractionUtils.isInvokable(method, 
argumentClasses)
+                                        ExtractionUtils.isInvokable(false, 
method, argumentClasses)
                                                 && 
ExtractionUtils.isAssignable(
-                                                        outputClass, 
method.getReturnType(), true));
+                                                        outputClass,
+                                                        method.getReturnType(),
+                                                        true,
+                                                        false));

Review Comment:
   Why not `true` here?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -185,101 +154,166 @@ static ResultExtraction 
createGenericResultExtractionFromMethod(
                             method,
                             paramPos,
                             genericPos);
-            return FunctionResultTemplate.of(dataType);
+            return FunctionResultTemplate.ofOutput(dataType);
         };
     }
 
-    /** Uses hints to extract functional template. */
-    private static Optional<FunctionResultTemplate> extractHints(
-            BaseMappingExtractor extractor, Method method) {
-        final Set<DataTypeHint> dataTypeHints = new HashSet<>();
-        dataTypeHints.addAll(collectAnnotationsOfMethod(DataTypeHint.class, 
method));
-        dataTypeHints.addAll(
-                collectAnnotationsOfClass(DataTypeHint.class, 
extractor.getFunctionClass()));
-        if (dataTypeHints.size() > 1) {
-            throw extractionError(
-                    "More than one data type hint found for output of 
function. "
-                            + "Please use a function hint instead.");
-        }
-        if (dataTypeHints.size() == 1) {
-            return Optional.ofNullable(
-                    FunctionTemplate.createResultTemplate(
-                            extractor.typeFactory, 
dataTypeHints.iterator().next()));
-        }
-        // otherwise continue with regular extraction
-        return Optional.empty();
-    }
+    // 
--------------------------------------------------------------------------------------------
+    // Verification strategies
+    // 
--------------------------------------------------------------------------------------------
 
-    /** Verification that checks a method by parameters and return type. */
+    /** Verification that checks a method by parameters (arguments only) and 
return type. */
     static MethodVerification createParameterAndReturnTypeVerification() {
-        return (method, signature, result) -> {
-            final Class<?>[] parameters = signature.toArray(new Class[0]);
+        return (method, state, arguments, result) -> {
+            checkNoState(state);
+            final Class<?>[] parameters = assembleParameters(state, arguments);
             final Class<?> returnType = method.getReturnType();
+            // TODO enable strict autoboxing

Review Comment:
   Can we fix this TODO?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java:
##########
@@ -936,10 +947,11 @@ public void visitLocalVariable(
      * @param cls the Class to check, may be null
      * @param toClass the Class to try to assign into, returns false if null
      * @param autoboxing whether to use implicit autoboxing/unboxing between 
primitives and wrappers
+     * @param strictAutoboxing checks whether null would end up in a primitive 
type and forbids it
      * @return {@code true} if assignment possible
      */
     public static boolean isAssignable(
-            Class<?> cls, final Class<?> toClass, final boolean autoboxing) {
+            Class<?> cls, final Class<?> toClass, boolean autoboxing, boolean 
strictAutoboxing) {

Review Comment:
   Can we be consistent with `final`? I believe both `autoboxing` and 
`strictAutoboxing` can be `final` 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java:
##########
@@ -171,12 +171,13 @@ private Object callProcedure(Procedure procedure, 
Class<?>[] inputClz, Object[]
                 methods.stream()
                         .filter(
                                 method ->
-                                        ExtractionUtils.isInvokable(method, 
inputClz)
+                                        ExtractionUtils.isInvokable(false, 
method, inputClz)

Review Comment:
   why `false`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
 
     protected final DataTypeFactory typeFactory;
 
-    private final String methodName;
+    protected final String methodName;
 
     private final SignatureExtraction signatureExtraction;
 
     protected final ResultExtraction outputExtraction;
 
-    protected final MethodVerification verification;
+    protected final MethodVerification outputVerification;
 
     public BaseMappingExtractor(
             DataTypeFactory typeFactory,
             String methodName,
             SignatureExtraction signatureExtraction,
             ResultExtraction outputExtraction,
-            MethodVerification verification) {
+            MethodVerification outputVerification) {
         this.typeFactory = typeFactory;
         this.methodName = methodName;
         this.signatureExtraction = signatureExtraction;
         this.outputExtraction = outputExtraction;
-        this.verification = verification;
+        this.outputVerification = outputVerification;
+    }
+
+    Map<FunctionSignatureTemplate, FunctionOutputTemplate> 
extractOutputMapping() {
+        try {
+            return extractResultMappings(
+                    outputExtraction, FunctionTemplate::getOutputTemplate, 
outputVerification);
+        } catch (Throwable t) {
+            throw extractionError(t, "Error in extracting a signature to 
output mapping.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Extraction strategies
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+     */
+    static SignatureExtraction createArgumentsFromParametersExtraction(
+            int offset, @Nullable Class<?> contextClass) {
+        return (extractor, method) -> {
+            final List<ArgumentParameter> args =
+                    extractArgumentParameters(method, offset, contextClass);
+
+            final EnumSet<StaticArgumentTrait>[] argumentTraits = 
extractArgumentTraits(args);
+
+            final List<FunctionArgumentTemplate> argumentTemplates =
+                    extractArgumentTemplates(
+                            extractor.typeFactory, 
extractor.getFunctionClass(), args);
+
+            final String[] argumentNames = extractArgumentNames(method, args);
+
+            final boolean[] argumentOptionals = extractArgumentOptionals(args);
+
+            return FunctionSignatureTemplate.of(
+                    argumentTemplates,
+                    method.isVarArgs(),
+                    argumentTraits,
+                    argumentNames,
+                    argumentOptionals);
+        };
+    }
+
+    /**
+     * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+     */
+    static SignatureExtraction createArgumentsFromParametersExtraction(int 
offset) {

Review Comment:
   Could you add description for `offset`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java:
##########
@@ -65,27 +74,110 @@ abstract class BaseMappingExtractor {
 
     protected final DataTypeFactory typeFactory;
 
-    private final String methodName;
+    protected final String methodName;
 
     private final SignatureExtraction signatureExtraction;
 
     protected final ResultExtraction outputExtraction;
 
-    protected final MethodVerification verification;
+    protected final MethodVerification outputVerification;
 
     public BaseMappingExtractor(
             DataTypeFactory typeFactory,
             String methodName,
             SignatureExtraction signatureExtraction,
             ResultExtraction outputExtraction,
-            MethodVerification verification) {
+            MethodVerification outputVerification) {
         this.typeFactory = typeFactory;
         this.methodName = methodName;
         this.signatureExtraction = signatureExtraction;
         this.outputExtraction = outputExtraction;
-        this.verification = verification;
+        this.outputVerification = outputVerification;
+    }
+
+    Map<FunctionSignatureTemplate, FunctionOutputTemplate> 
extractOutputMapping() {
+        try {
+            return extractResultMappings(
+                    outputExtraction, FunctionTemplate::getOutputTemplate, 
outputVerification);
+        } catch (Throwable t) {
+            throw extractionError(t, "Error in extracting a signature to 
output mapping.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Extraction strategies
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+     */
+    static SignatureExtraction createArgumentsFromParametersExtraction(
+            int offset, @Nullable Class<?> contextClass) {

Review Comment:
   could you add a description what is the offset?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -185,101 +154,166 @@ static ResultExtraction 
createGenericResultExtractionFromMethod(
                             method,
                             paramPos,
                             genericPos);
-            return FunctionResultTemplate.of(dataType);
+            return FunctionResultTemplate.ofOutput(dataType);
         };
     }
 
-    /** Uses hints to extract functional template. */
-    private static Optional<FunctionResultTemplate> extractHints(
-            BaseMappingExtractor extractor, Method method) {
-        final Set<DataTypeHint> dataTypeHints = new HashSet<>();
-        dataTypeHints.addAll(collectAnnotationsOfMethod(DataTypeHint.class, 
method));
-        dataTypeHints.addAll(
-                collectAnnotationsOfClass(DataTypeHint.class, 
extractor.getFunctionClass()));
-        if (dataTypeHints.size() > 1) {
-            throw extractionError(
-                    "More than one data type hint found for output of 
function. "
-                            + "Please use a function hint instead.");
-        }
-        if (dataTypeHints.size() == 1) {
-            return Optional.ofNullable(
-                    FunctionTemplate.createResultTemplate(
-                            extractor.typeFactory, 
dataTypeHints.iterator().next()));
-        }
-        // otherwise continue with regular extraction
-        return Optional.empty();
-    }
+    // 
--------------------------------------------------------------------------------------------
+    // Verification strategies
+    // 
--------------------------------------------------------------------------------------------
 
-    /** Verification that checks a method by parameters and return type. */
+    /** Verification that checks a method by parameters (arguments only) and 
return type. */
     static MethodVerification createParameterAndReturnTypeVerification() {
-        return (method, signature, result) -> {
-            final Class<?>[] parameters = signature.toArray(new Class[0]);
+        return (method, state, arguments, result) -> {
+            checkNoState(state);
+            final Class<?>[] parameters = assembleParameters(state, arguments);
             final Class<?> returnType = method.getReturnType();
+            // TODO enable strict autoboxing
             final boolean isValid =
-                    isInvokable(method, parameters) && isAssignable(result, 
returnType, true);
+                    isInvokable(false, method, parameters)
+                            && isAssignable(result, returnType, true, false);
             if (!isValid) {
-                throw createMethodNotFoundError(method.getName(), parameters, 
result);
+                throw createMethodNotFoundError(method.getName(), parameters, 
result, "");
             }
         };
     }
 
-    /** Verification that checks a method by parameters including an 
accumulator. */
-    static MethodVerification createParameterWithAccumulatorVerification() {
-        return (method, signature, result) -> {
-            if (result != null) {
-                // ignore the accumulator in the first argument
-                createParameterWithArgumentVerification(null).verify(method, 
signature, result);
+    /** Verification that checks a method by parameters (arguments only or 
with accumulator). */
+    static MethodVerification createParameterVerification(boolean 
requireAccumulator) {
+        return (method, state, arguments, result) -> {
+            if (requireAccumulator) {
+                checkSingleState(state);
             } else {
-                // check the signature only
-                createParameterVerification().verify(method, signature, null);
+                checkNoState(state);
             }
-        };
-    }
-
-    /** Verification that checks a method by parameters including an 
additional first parameter. */
-    static MethodVerification createParameterWithArgumentVerification(
-            @Nullable Class<?> argumentClass) {
-        return (method, signature, result) -> {
-            final Class<?>[] parameters =
-                    Stream.concat(Stream.of(argumentClass), signature.stream())
-                            .toArray(Class<?>[]::new);
-            if (!isInvokable(method, parameters)) {
-                throw createMethodNotFoundError(method.getName(), parameters, 
null);
+            final Class<?>[] parameters = assembleParameters(state, arguments);
+            // TODO enable strict autoboxing

Review Comment:
   Can we fix this?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/CountAggFunction.scala:
##########
@@ -42,7 +42,7 @@ class CountAggFunction extends AggregateFunction[JLong, 
CountAccumulator] {
     acc.f0 += 1L
   }
 
-  def retract(acc: CountAccumulator, @DataTypeHint("INT") value: Any): Unit = {
+  def retract(acc: CountAccumulator, value: Any): Unit = {

Review Comment:
   Is this change mandatory or is the annotation just no longer necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to