dawidwys commented on a change in pull request #10606: [FLINK-15009][table-common] Add a utility for creating type inference logic via reflection URL: https://github.com/apache/flink/pull/10606#discussion_r364728289
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/utils/FunctionMappingExtractor.java ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.extraction.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeLookup; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.extraction.DataTypeExtractor; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.extraction.utils.ExtractionUtils.collectMethods; +import static org.apache.flink.table.types.extraction.utils.ExtractionUtils.extractionError; +import static org.apache.flink.table.types.extraction.utils.ExtractionUtils.isAssignable; +import static org.apache.flink.table.types.extraction.utils.ExtractionUtils.isMethodInvokable; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.extractGlobalFunctionTemplates; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.extractLocalFunctionTemplates; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.findInputOnlyTemplates; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.findResultMappingTemplates; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.findResultOnlyTemplate; +import static org.apache.flink.table.types.extraction.utils.TemplateUtils.findResultOnlyTemplates; + +/** + * Utility for extracting function mappings from signature to result, e.g. from (INT, STRING) to BOOLEAN. + */ +@Internal +public final class FunctionMappingExtractor { + + private final DataTypeLookup lookup; + + private final Class<? extends UserDefinedFunction> function; + + private final String methodName; + + private final SignatureExtraction signatureExtraction; + + private final @Nullable ResultExtraction accumulatorExtraction; + + private final ResultExtraction outputExtraction; + + private final MethodVerification verification; + + public FunctionMappingExtractor( + DataTypeLookup lookup, + Class<? extends UserDefinedFunction> function, + String methodName, + SignatureExtraction signatureExtraction, + @Nullable ResultExtraction accumulatorExtraction, + ResultExtraction outputExtraction, + MethodVerification verification) { + this.lookup = lookup; + this.function = function; + this.methodName = methodName; + this.signatureExtraction = signatureExtraction; + this.accumulatorExtraction = accumulatorExtraction; + this.outputExtraction = outputExtraction; + this.verification = verification; + } + + public Class<? extends UserDefinedFunction> getFunction() { + return function; + } + + public boolean hasAccumulator() { + return accumulatorExtraction != null; + } + + public Map<FunctionSignatureTemplate, FunctionResultTemplate> extractOutputMapping() { + try { + return extractResultMappings( + outputExtraction, + FunctionTemplate::getOutputTemplate, + verification); + } catch (Throwable t) { + throw extractionError(t, "Error in extracting a signature to output mapping."); + } + } + + public Map<FunctionSignatureTemplate, FunctionResultTemplate> extractAccumulatorMapping() { + Preconditions.checkState(hasAccumulator()); + try { + return extractResultMappings( + accumulatorExtraction, + FunctionTemplate::getAccumulatorTemplate, + (method, signature, result) -> { + // put the result into the signature for accumulators + final List<Class<?>> arguments = Stream.concat(Stream.of(result), signature.stream()) + .collect(Collectors.toList()); + verification.verify(method, arguments, null); + }); + } catch (Throwable t) { + throw extractionError(t, "Error in extracting a signature to accumulator mapping."); + } + } + + /** + * Extracts mappings from signature to result (either accumulator or output) for the entire + * function. Verifies if the extracted inference matches with the implementation. + * + * <p>For example, from {@code (INT, BOOLEAN, ANY) -> INT}. It does this by going through all implementation + * methods and collecting all "per-method" mappings. The function mapping is the union of all "per-method" + * mappings. + */ + private Map<FunctionSignatureTemplate, FunctionResultTemplate> extractResultMappings( + ResultExtraction resultExtraction, + Function<FunctionTemplate, FunctionResultTemplate> accessor, + MethodVerification verification) { + final Set<FunctionTemplate> global = extractGlobalFunctionTemplates(lookup, function); + final Set<FunctionResultTemplate> globalResultOnly = findResultOnlyTemplates(global, accessor); + + // for each method find a signature that maps to results + final Map<FunctionSignatureTemplate, FunctionResultTemplate> collectedMappings = new HashMap<>(); + final List<Method> methods = collectMethods(function, methodName); + if (methods.size() == 0) { + throw extractionError( + "Could not find a publicly accessible method named '%s'.", + methodName); + } + for (Method method : methods) { + try { + final Map<FunctionSignatureTemplate, FunctionResultTemplate> collectedMappingsPerMethod = + collectMethodMappings(method, global, globalResultOnly, resultExtraction, accessor); + + // check if the method can be called + verifyMappingForMethod(method, collectedMappingsPerMethod, verification); + + // check if method strategies conflict with function strategies + collectedMappingsPerMethod.forEach((signature, result) -> putMapping(collectedMappings, signature, result)); + } catch (Throwable t) { + throw extractionError( + t, + "Unable to extract a type inference from method:\n%s", + method.toString()); + } + } + return collectedMappings; + } + + /** + * Extracts mappings from signature to result (either accumulator or output) for the given method. It + * considers both global hints for the entire function and local hints just for this method. + * + * <p>The algorithm aims to find an input signature for every declared result. If no result is + * declared, it will be extracted. If no input signature is declared, it will be extracted. + */ + private Map<FunctionSignatureTemplate, FunctionResultTemplate> collectMethodMappings( + Method method, + Set<FunctionTemplate> global, + Set<FunctionResultTemplate> globalResultOnly, + ResultExtraction resultExtraction, + Function<FunctionTemplate, FunctionResultTemplate> accessor) { + final Map<FunctionSignatureTemplate, FunctionResultTemplate> collectedMappingsPerMethod = new HashMap<>(); + final Set<FunctionTemplate> local = extractLocalFunctionTemplates(lookup, method); + + final Set<FunctionResultTemplate> localResultOnly = findResultOnlyTemplates( + local, + accessor); + + final Set<FunctionTemplate> explicitMappings = findResultMappingTemplates( + global, + local, + accessor); + + final FunctionResultTemplate resultOnly = findResultOnlyTemplate( + globalResultOnly, + localResultOnly, + explicitMappings, + accessor); + + final Set<FunctionSignatureTemplate> inputOnly = findInputOnlyTemplates(global, local, accessor); + + // add all explicit mappings because they contain complete signatures + putExplicitMappings(collectedMappingsPerMethod, explicitMappings, inputOnly, accessor); + // add result only template with explicit or extracted signatures + putUniqueResultMappings(collectedMappingsPerMethod, resultOnly, inputOnly, signatureExtraction, method); + // handle missing result by extraction with explicit or extracted signatures + putExtractedResultMappings(collectedMappingsPerMethod, inputOnly, signatureExtraction, resultExtraction, method); + + return collectedMappingsPerMethod; + } + + // -------------------------------------------------------------------------------------------- + // Helper methods (ordered by invocation order) + // -------------------------------------------------------------------------------------------- + + /** + * Explicit mappings with complete signature to result declaration. + */ + private void putExplicitMappings( + Map<FunctionSignatureTemplate, FunctionResultTemplate> collectedMappings, + Set<FunctionTemplate> explicitMappings, + Set<FunctionSignatureTemplate> signatureOnly, + Function<FunctionTemplate, FunctionResultTemplate> accessor) { + explicitMappings.forEach(t -> { + // signature templates are valid everywhere and are added to the explicit mapping + Stream.concat(signatureOnly.stream(), Stream.of(t.getSignatureTemplate())) + .forEach(v -> putMapping(collectedMappings, v, accessor.apply(t))); + }); + } + + /** + * Result only template with explicit or extracted signatures. + */ + private void putUniqueResultMappings( + Map<FunctionSignatureTemplate, FunctionResultTemplate> collectedMappings, + @Nullable FunctionResultTemplate uniqueResult, + Set<FunctionSignatureTemplate> signatureOnly, + SignatureExtraction signatureExtraction, Review comment: Use the `signatureExtraction` from the field. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
