dawidwys commented on a change in pull request #10606: 
[FLINK-15009][table-common] Add a utility for creating type inference logic via 
reflection
URL: https://github.com/apache/flink/pull/10606#discussion_r363734617
 
 

 ##########
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
 ##########
 @@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.extraction;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.DataTypeLookup;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.extraction.utils.ExtractionUtils;
+import org.apache.flink.table.types.extraction.utils.FunctionArgumentTemplate;
+import org.apache.flink.table.types.extraction.utils.FunctionResultTemplate;
+import org.apache.flink.table.types.extraction.utils.FunctionSignatureTemplate;
+import org.apache.flink.table.types.extraction.utils.FunctionTemplate;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.inference.TypeStrategy;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.collectAnnotationsOfClass;
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.collectAnnotationsOfMethod;
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.collectMethods;
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.extractionError;
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.isAssignable;
+import static 
org.apache.flink.table.types.extraction.utils.ExtractionUtils.isMethodInvokable;
+
+/**
+ * Reflection-based utility for extracting a {@link TypeInference} from a 
supported subclass of
+ * {@link UserDefinedFunction}.
+ *
+ * <p>The behavior of this utility can be influenced by {@link DataTypeHint}s 
and {@link FunctionHint}s
+ * which have higher precedence than the reflective information.
+ *
+ * <p>Note: This utility assumes that functions have been validated before 
regarding accessibility of
+ * class/methods and serializability.
+ */
+@Internal
+public final class TypeInferenceExtractor {
+
+       private final DataTypeLookup lookup;
+
+       private final Class<? extends UserDefinedFunction> function;
+
+       private final String functionExplanation;
+
+       private final String methodName;
+
+       private TypeInferenceExtractor(
+                       DataTypeLookup lookup,
+                       Class<? extends UserDefinedFunction> function,
+                       String functionExplanation,
+                       String methodName) {
+               this.lookup = lookup;
+               this.function = function;
+               this.functionExplanation = functionExplanation;
+               this.methodName = methodName;
+       }
+
+       /**
+        * Extracts a type inference from a {@link ScalarFunction}.
+        */
+       public static TypeInference forScalarFunction(DataTypeLookup lookup, 
Class<? extends ScalarFunction> function) {
+               final TypeInferenceExtractor extractor = new 
TypeInferenceExtractor(lookup, function, "scalar", "eval");
+               return extractor.extractTypeInference(
+                       extractor.createParameterSignatureExtraction(false),
+                       null,
+                       extractor.createReturnTypeResultExtraction(),
+                       extractor.createParameterAndReturnTypeVerification()
+               );
+       }
+
+       /**
+        * Extracts a type inference from a {@link AggregateFunction}.
+        */
+       public static TypeInference forAggregateFunction(DataTypeLookup lookup, 
Class<? extends AggregateFunction> function) {
+               final TypeInferenceExtractor extractor = new 
TypeInferenceExtractor(lookup, function, "aggregate", "accumulate");
+               return extractor.extractTypeInference(
+                       extractor.createParameterSignatureExtraction(true),
+                       
extractor.createGenericResultExtraction(AggregateFunction.class, 1),
+                       
extractor.createGenericResultExtraction(AggregateFunction.class, 0),
+                       extractor.createParameterWithAccumulatorVerification()
+               );
+       }
+
+       /**
+        * Extracts a type inference from a {@link TableFunction}.
+        */
+       public static TypeInference forTableFunction(DataTypeLookup lookup, 
Class<? extends TableFunction> function) {
+               final TypeInferenceExtractor extractor = new 
TypeInferenceExtractor(lookup, function, "table", "eval");
+               return extractor.extractTypeInference(
+                       extractor.createParameterSignatureExtraction(false),
+                       null,
+                       
extractor.createGenericResultExtraction(TableFunction.class, 0),
+                       extractor.createParameterVerification()
+               );
+       }
+
+       /**
+        * Extracts a type inference from a {@link TableAggregateFunction}.
+        */
+       public static TypeInference forTableAggregateFunction(DataTypeLookup 
lookup, Class<? extends TableAggregateFunction> function) {
+               final TypeInferenceExtractor extractor = new 
TypeInferenceExtractor(lookup, function, "table aggregate", "accumulate");
+               return extractor.extractTypeInference(
+                       extractor.createParameterSignatureExtraction(true),
+                       
extractor.createGenericResultExtraction(TableAggregateFunction.class, 1),
+                       
extractor.createGenericResultExtraction(TableAggregateFunction.class, 0),
+                       extractor.createParameterWithAccumulatorVerification()
+               );
+       }
+
+       /**
+        * Extracts a type inference from a {@link AsyncTableFunction}.
+        */
+       public static TypeInference forAsyncTableFunction(DataTypeLookup 
lookup, Class<? extends AsyncTableFunction> function) {
+               final TypeInferenceExtractor extractor = new 
TypeInferenceExtractor(lookup, function, "async table", "eval");
+               return extractor.extractTypeInference(
+                       extractor.createParameterSignatureExtraction(true),
+                       null,
+                       
extractor.createGenericResultExtraction(AsyncTableFunction.class, 0),
+                       
extractor.createParameterWithArgumentVerification(CompletableFuture.class)
+               );
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Context sensitive extraction and verification logic
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Extraction that uses the method parameters for producing a {@link 
FunctionSignatureTemplate}.
+        */
+       private SignatureExtraction createParameterSignatureExtraction(boolean 
excludeFirstArg) {
+               final int offset;
+               if (excludeFirstArg) {
+                       offset = 1;
+               } else {
+                       offset = 0;
+               }
+               return method -> {
+                       // argument types
+                       final List<FunctionArgumentTemplate> parameterTypes = 
IntStream.range(offset, method.getParameterCount())
+                       .mapToObj(i -> {
+                               final DataType type = 
DataTypeExtractor.extractFromMethodParameter(lookup, function, method, i);
+                               // unwrap from ARRAY data type in case of 
varargs
+                               if (method.isVarArgs() && i == 
method.getParameterCount() - 1 && type instanceof CollectionDataType) {
+                                       return ((CollectionDataType) 
type).getElementDataType();
+                               } else {
+                                       return type;
+                               }
+                       })
+                       .map(FunctionArgumentTemplate::of)
+                       .collect(Collectors.toList());
+
+                       // argument names
+                       final List<String> methodParameterNames = 
ExtractionUtils.extractMethodParameterNames(method);
+                       final String[] argumentNames;
+                       if (methodParameterNames != null) {
+                               argumentNames = 
methodParameterNames.subList(offset, methodParameterNames.size())
+                                       .toArray(new String[0]);
+                       } else {
+                               argumentNames = null;
+                       }
+
+                       return FunctionSignatureTemplate.of(parameterTypes, 
method.isVarArgs(), argumentNames);
+               };
+       }
+
+       /**
+        * Extraction that uses a generic type variable for producing a {@link 
FunctionResultTemplate}.
+        */
+       private ResultExtraction createGenericResultExtraction(
+                       Class<? extends UserDefinedFunction> baseClass,
+                       int genericPos) {
+               return (method) -> {
+                       final DataType dataType = 
DataTypeExtractor.extractFromGeneric(lookup, baseClass, genericPos, function);
+                       return FunctionResultTemplate.of(dataType);
+               };
+       }
+
+       /**
+        * Extraction that uses the method return type for producing a {@link 
FunctionResultTemplate}.
+        */
+       private ResultExtraction createReturnTypeResultExtraction() {
+               return (method) -> {
+                       final DataType dataType = 
DataTypeExtractor.extractFromMethodOutput(lookup, function, method);
+                       return FunctionResultTemplate.of(dataType);
+               };
+       }
+
+       /**
+        * Verification that checks a method by parameters and return type.
+        */
+       private MethodVerification createParameterAndReturnTypeVerification() {
+               return (method, signature, accumulator, output) -> {
+                       final Class<?>[] parameters = signature.toArray(new 
Class[0]);
+                       final Class<?> returnType = method.getReturnType();
+                       final boolean isValid = isMethodInvokable(method, 
parameters) &&
+                               isAssignable(output, returnType, true);
+                       if (!isValid) {
+                               throw createMethodNotFoundError(parameters, 
returnType);
+                       }
+               };
+       }
+
+       /**
+        * Verification that checks a method by parameters including an 
accumulator.
+        */
+       private MethodVerification createParameterWithAccumulatorVerification() 
{
+               return (method, signature, accumulator, output) ->
+                       createParameterWithArgumentVerification(accumulator)
+                               .verify(method, signature, accumulator, output);
+       }
+
+       /**
+        * Verification that checks a method by parameters including an 
additional first parameter.
+        */
+       private MethodVerification 
createParameterWithArgumentVerification(@Nullable Class<?> argumentClass) {
+               return (method, signature, accumulator, output) -> {
+                       final Class<?>[] parameters = 
Stream.concat(Stream.of(argumentClass), signature.stream())
+                               .toArray(Class[]::new);
+                       if (!isMethodInvokable(method, parameters)) {
+                               throw createMethodNotFoundError(parameters, 
null);
+                       }
+               };
+       }
+
+       /**
+        * Verification that checks a method by parameters.
+        */
+       private MethodVerification createParameterVerification() {
+               return (method, signature, accumulator, output) -> {
+                       final Class<?>[] parameters = signature.toArray(new 
Class[0]);
+                       if (!isMethodInvokable(method, parameters)) {
+                               throw createMethodNotFoundError(parameters, 
null);
+                       }
+               };
+       }
+
+       private ValidationException createMethodNotFoundError(Class<?>[] 
parameters, @Nullable Class<?> returnType) {
+               final StringBuilder builder = new StringBuilder();
+               if (returnType != null) {
+                       builder.append(returnType.getName()).append(" ");
+               }
+               builder
+                       .append(methodName)
+                       .append(
+                               Stream.of(parameters)
+                                       .map(parameter -> {
+                                               // in case we don't know the 
parameter at this location (i.e. for accumulators)
+                                               if (parameter == null) {
+                                                       return "_";
+                                               } else {
+                                                       return 
parameter.getName();
+                                               }
+                                       })
+                                       .collect(Collectors.joining(", ", "(", 
")")));
+               return extractionError(
+                       "Considering all hints, the method should comply with 
the signature:\n%s",
+                       builder.toString());
+       }
+
+       private TypeInference extractTypeInference(
+                       SignatureExtraction signatureExtraction,
+                       @Nullable ResultExtraction accumulatorExtraction,
+                       ResultExtraction outputExtraction,
+                       MethodVerification verification) {
+               try {
+                       return extractTypeInferenceOrError(
+                               signatureExtraction,
+                               accumulatorExtraction,
+                               outputExtraction,
+                               verification
+                       );
+               } catch (Throwable t) {
+                       throw extractionError(
+                               t,
+                               "Could not extract a valid type inference from 
%s function class '%s'. " +
+                                       "Please check for implementation 
mistakes and/or provide a corresponding hint.",
+                               functionExplanation,
+                               function.getName());
+               }
+       }
+
+       private TypeInference extractTypeInferenceOrError(
+                       SignatureExtraction signatureExtraction,
+                       @Nullable ResultExtraction accumulatorExtraction,
+                       ResultExtraction outputExtraction,
+                       MethodVerification verification) {
+
+               final Map<FunctionSignatureTemplate, FunctionResultTemplate> 
outputMapping;
+               try {
+                       outputMapping = extractResultMappings(
+                               signatureExtraction,
+                               outputExtraction,
+                               verification,
+                               false);
+               } catch (Throwable t) {
+                       throw extractionError(t, "Error in extracting a 
signature to output strategy.");
+               }
+
+               // function is accumulating
+               if (accumulatorExtraction != null) {
+                       final Map<FunctionSignatureTemplate, 
FunctionResultTemplate> accumulatorMapping;
+                       try {
+                               accumulatorMapping = extractResultMappings(
+                                       signatureExtraction,
+                                       accumulatorExtraction,
+                                       verification,
+                                       true);
+                       } catch (Throwable t) {
+                               throw extractionError(t, "Error in extracting a 
signature to accumulator strategy.");
+                       }
+                       return buildInference(accumulatorMapping, 
outputMapping);
+               }
+               return buildInference(null, outputMapping);
+       }
+
+       /**
+        * Extracts mappings from signature to result (either accumulator or 
output) for the entire
+        * function. Verifies if the extracted inference matches with the 
implementation.
+        *
+        * <p>For example, from {@code (INT, BOOLEAN, ANY) -> INT}. It does 
this by going through all implementation
+        * methods and collecting all "per-method" mappings. The function 
mapping is the union of all "per-method"
+        * mappings.
+        */
+       private Map<FunctionSignatureTemplate, FunctionResultTemplate> 
extractResultMappings(
+                       SignatureExtraction signatureExtraction,
+                       ResultExtraction resultExtraction,
+                       MethodVerification verification,
+                       boolean isAccumulator) {
+               final Function<FunctionTemplate, FunctionResultTemplate> 
accessor;
+               if (isAccumulator) {
+                       accessor = FunctionTemplate::getAccumulatorTemplate;
+               } else {
+                       accessor = FunctionTemplate::getOutputTemplate;
+               }
+
+               final Set<FunctionTemplate> global = 
extractGlobalFunctionTemplates();
+               final Set<FunctionResultTemplate> globalResultOnly = 
findResultOnlyTemplates(global, accessor);
+
+               // for each method find a signature that maps to results
+               final Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappings = new HashMap<>();
+               final List<Method> methods = collectMethods(function, 
methodName);
+               if (methods.size() == 0) {
+                       throw extractionError(
+                               "Could not find a publicly accessible method 
named '%s'.",
+                               methodName);
+               }
+               for (Method method : methods) {
+                       try {
+                               final Map<FunctionSignatureTemplate, 
FunctionResultTemplate> collectedMappingsPerMethod =
+                                       collectMethodMappings(method, global, 
globalResultOnly, signatureExtraction, resultExtraction, accessor);
+
+                               // check if the method can be called
+                               verifyMappingForMethod(method, 
collectedMappingsPerMethod, verification, isAccumulator);
+
+                               // check if method strategies conflict with 
function strategies
+                               collectedMappingsPerMethod.forEach((signature, 
result) -> putMapping(collectedMappings, signature, result));
+                       } catch (Throwable t) {
+                               throw extractionError(
+                                       t,
+                                       "Unable to extract a type inference 
from method:\n%s",
+                                       method.toString());
+                       }
+               }
+               return collectedMappings;
+       }
+
+       /**
+        * Extracts mappings from signature to result (either accumulator or 
output) for the given method. It
+        * considers both global hints for the entire function and local hints 
just for this method.
+        *
+        * <p>The algorithm aims to find an input signature for every declared 
result. If no result is
+        * declared, it will be extracted. If no input signature is declared, 
it will be extracted.
+        */
+       private Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectMethodMappings(
+                       Method method,
+                       Set<FunctionTemplate> global,
+                       Set<FunctionResultTemplate> globalResultOnly,
+                       SignatureExtraction signatureExtraction,
+                       ResultExtraction resultExtraction,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               final Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappingsPerMethod = new HashMap<>();
+               final Set<FunctionTemplate> local = 
extractLocalFunctionTemplates(method);
+
+               final Set<FunctionResultTemplate> localResultOnly = 
findResultOnlyTemplates(
+                       local,
+                       accessor);
+
+               final Set<FunctionTemplate> explicitMappings = 
findResultMappingTemplates(
+                       global,
+                       local,
+                       accessor);
+
+               final FunctionResultTemplate resultOnly = 
findResultOnlyTemplate(
+                       globalResultOnly,
+                       localResultOnly,
+                       explicitMappings,
+                       accessor);
+
+               final Set<FunctionSignatureTemplate> inputOnly = 
findInputOnlyTemplates(global, local, accessor);
+
+               // add all explicit mappings because they contain complete 
signatures
+               putExplicitMappings(collectedMappingsPerMethod, 
explicitMappings, inputOnly, accessor);
+               // add result only template with explicit or extracted 
signatures
+               putUniqueResultMapping(collectedMappingsPerMethod, resultOnly, 
inputOnly, signatureExtraction, method);
+               // handle missing result by extraction with explicit or 
extracted signatures
+               putExtractedResult(collectedMappingsPerMethod, inputOnly, 
signatureExtraction, resultExtraction, method);
+
+               return collectedMappingsPerMethod;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper methods (ordered by invocation order)
+       // 
--------------------------------------------------------------------------------------------
+
+       private Set<FunctionTemplate> extractGlobalFunctionTemplates() {
+               return asFunctionTemplates(lookup, 
collectAnnotationsOfClass(FunctionHint.class, function));
+       }
+
+       private Set<FunctionTemplate> extractLocalFunctionTemplates(Method 
method) {
+               return asFunctionTemplates(lookup, 
collectAnnotationsOfMethod(FunctionHint.class, method));
+       }
+
+       private static Set<FunctionTemplate> asFunctionTemplates(DataTypeLookup 
lookup, Set<FunctionHint> hints) {
+               return hints.stream()
+                       .map(hint -> {
+                               try {
+                                       return 
FunctionTemplate.fromAnnotation(lookup, hint);
+                               } catch (Throwable t) {
+                                       throw extractionError(t, "Error in 
function hint annotation.");
+                               }
+                       })
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Find a template that only specifies a result.
+        */
+       private Set<FunctionResultTemplate> findResultOnlyTemplates(
+                       Set<FunctionTemplate> functionTemplates,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               return functionTemplates.stream()
+                       .filter(t -> t.getSignatureTemplate() == null && 
accessor.apply(t) != null)
+                       .map(accessor)
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Hints that only declare a result (either accumulator or output).
+        */
+       private static @Nullable FunctionResultTemplate findResultOnlyTemplate(
+                       Set<FunctionResultTemplate> globalResultOnly,
+                       Set<FunctionResultTemplate> localResultOnly,
+                       Set<FunctionTemplate> explicitMappings,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               final Set<FunctionResultTemplate> resultOnly = Stream.concat(
+                               globalResultOnly.stream(),
+                               localResultOnly.stream())
+                       .collect(Collectors.toSet());
+               final Set<FunctionResultTemplate> allResults = Stream.concat(
+                               resultOnly.stream(),
+                               explicitMappings.stream().map(accessor))
+                       .collect(Collectors.toSet());
+               if (resultOnly.size() == 1 && allResults.size() == 1) {
+                       return resultOnly.stream().findFirst().orElse(null);
+               }
+               // different results is only fine as long as those come from a 
mapping
+               if (resultOnly.size() > 1 || (!resultOnly.isEmpty() && 
!explicitMappings.isEmpty())) {
+                       throw extractionError("Function hints that lead to 
ambiguous results are not allowed.");
+               }
+               return null;
+       }
+
+       /**
+        * Hints that map a signature to a result.
+        */
+       private Set<FunctionTemplate> findResultMappingTemplates(
+                       Set<FunctionTemplate> globalTemplates,
+                       Set<FunctionTemplate> localTemplates,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               return Stream.concat(globalTemplates.stream(), 
localTemplates.stream())
+                       .filter(t -> t.getSignatureTemplate() != null && 
accessor.apply(t) != null)
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Hints that only declare an input.
+        */
+       private static Set<FunctionSignatureTemplate> findInputOnlyTemplates(
+                       Set<FunctionTemplate> global,
+                       Set<FunctionTemplate> local,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               return Stream.concat(global.stream(), local.stream())
+                       .filter(t ->
+                               t.getSignatureTemplate() != null &&
+                               accessor.apply(t) == null)
+                       .map(FunctionTemplate::getSignatureTemplate)
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Explicit mappings with complete signature to result declaration.
+        */
+       private static void putExplicitMappings(
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappings,
+                       Set<FunctionTemplate> explicitMappings,
+                       Set<FunctionSignatureTemplate> signatureOnly,
+                       Function<FunctionTemplate, FunctionResultTemplate> 
accessor) {
+               explicitMappings.forEach(t -> {
+                       // signature templates are valid everywhere and are 
added to the explicit mapping
+                       Stream.concat(signatureOnly.stream(), 
Stream.of(t.getSignatureTemplate()))
+                               .forEach(v -> putMapping(collectedMappings, v, 
accessor.apply(t)));
+               });
+       }
+
+       /**
+        * Result only template with explicit or extracted signatures.
+        */
+       private static void putUniqueResultMapping(
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappings,
+                       @Nullable FunctionResultTemplate uniqueResult,
+                       Set<FunctionSignatureTemplate> signatureOnly,
+                       SignatureExtraction signatureExtraction,
+                       Method method) {
+               if (uniqueResult == null) {
+                       return;
+               }
+               // input only templates are valid everywhere if they don't 
exist fallback to extraction
+               if (!signatureOnly.isEmpty()) {
+                       signatureOnly.forEach(s -> 
putMapping(collectedMappings, s, uniqueResult));
+               } else {
+                       putMapping(collectedMappings, 
signatureExtraction.extract(method), uniqueResult);
+               }
+       }
+
+       /**
+        * Missing result by extraction with explicit or extracted signatures.
+        */
+       private static void putExtractedResult(
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappings,
+                       Set<FunctionSignatureTemplate> inputOnly,
+                       SignatureExtraction signatureExtraction,
+                       ResultExtraction resultExtraction,
+                       Method method) {
+               if (!collectedMappings.isEmpty()) {
+                       return;
+               }
+               final FunctionResultTemplate result = 
resultExtraction.extract(method);
+               // input only validators are valid everywhere if they don't 
exist fallback to extraction
+               if (!inputOnly.isEmpty()) {
+                       inputOnly.forEach(signature -> 
putMapping(collectedMappings, signature, result));
+               } else {
+                       final FunctionSignatureTemplate signature = 
signatureExtraction.extract(method);
+                       putMapping(collectedMappings, signature, result);
+               }
+       }
+
+       private static void putMapping(
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappings,
+                       FunctionSignatureTemplate signature,
+                       FunctionResultTemplate result) {
+               final FunctionResultTemplate existingResult = 
collectedMappings.get(signature);
+               if (existingResult == null) {
+                       collectedMappings.put(signature, result);
+               }
+               // template must not conflict with same input
+               else if (!existingResult.equals(result)) {
+                       throw extractionError(
+                               "Function hints with same input definition but 
different result types are not allowed.");
+               }
+       }
+
+       /**
+        * Checks if the given method can be called and returns what hints 
declare.
+        */
+       private void verifyMappingForMethod(
+                       Method method,
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappingsPerMethod,
+                       MethodVerification verification,
+                       boolean isAccumulator) {
+               collectedMappingsPerMethod.forEach((signature, result) -> {
+                       if (isAccumulator) {
+                               verification.verify(method, 
signature.toClass(), result.toClass(), null);
+                       } else {
+                               verification.verify(method, 
signature.toClass(), null, result.toClass());
+                       }
+               });
+       }
+
+       private static TypeInference buildInference(
+                       @Nullable Map<FunctionSignatureTemplate, 
FunctionResultTemplate> accumulatorMapping,
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
outputMapping) {
+               final TypeInference.Builder builder = 
TypeInference.newBuilder();
+
+               configureNamedAndTypedArguments(builder, outputMapping);
+
+               
builder.inputTypeStrategy(translateInputTypeStrategy(outputMapping));
+
+               if (accumulatorMapping != null) {
+                       // verify that accumulator and output are derived from 
the same input strategy
+                       if 
(!accumulatorMapping.keySet().equals(outputMapping.keySet())) {
+                               throw extractionError(
+                                       "Mismatch between accumulator signature 
and output signature. " +
+                                               "Both intermediate and output 
results must be derived from the same input strategy.");
+                       }
+                       
builder.accumulatorTypeStrategy(translateResultTypeStrategy(accumulatorMapping));
+               }
+
+               
builder.outputTypeStrategy(translateResultTypeStrategy(outputMapping));
+               return builder.build();
+       }
+
+       private static void configureNamedAndTypedArguments(
+                       TypeInference.Builder builder,
+                       Map<FunctionSignatureTemplate, FunctionResultTemplate> 
outputMapping) {
+               if (outputMapping.size() != 1) {
+                       return;
+               }
+               final FunctionSignatureTemplate signature = 
outputMapping.keySet().iterator().next();
+               final List<DataType> dataTypes = 
signature.argumentTemplates.stream()
+                       .map(a -> a.dataType)
+                       .collect(Collectors.toList());
+               if (!signature.isVarArgs && 
dataTypes.stream().allMatch(Objects::nonNull)) {
+                       builder.typedArguments(dataTypes);
+                       if (signature.argumentNames != null) {
+                               
builder.namedArguments(Arrays.asList(signature.argumentNames));
+                       }
+               }
+       }
+
+       private static TypeStrategy 
translateResultTypeStrategy(Map<FunctionSignatureTemplate, 
FunctionResultTemplate> resultMapping) {
+               final Map<InputTypeStrategy, TypeStrategy> mappings = 
resultMapping.entrySet()
+                       .stream()
+                       .collect(
+                               Collectors.toMap(
+                                       e -> e.getKey().toInputTypeStrategy(),
+                                       e -> e.getValue().toTypeStrategy()));
+               return TypeStrategies.mapping(mappings);
+       }
+
+       private static InputTypeStrategy 
translateInputTypeStrategy(Map<FunctionSignatureTemplate, 
FunctionResultTemplate> outputMapping) {
+               return outputMapping.keySet().stream()
+                       .map(FunctionSignatureTemplate::toInputTypeStrategy)
+                       .reduce(InputTypeStrategies::or)
+                       .orElse(InputTypeStrategies.sequence());
+       }
 
 Review comment:
   Could we extract that to a separate class?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to