This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ac109b0b4f [FLINK-36704][table-common] Update TypeInference with 
StaticArgument and StateTypeStrategy
3ac109b0b4f is described below

commit 3ac109b0b4faea35bd58bd16e7c0657b80be65ac
Author: Timo Walther <[email protected]>
AuthorDate: Mon Nov 25 16:09:47 2024 +0100

    [FLINK-36704][table-common] Update TypeInference with StaticArgument and 
StateTypeStrategy
---
 .../src/test/resources/sql/function.q              |  26 +-
 .../table/functions/BuiltInFunctionDefinition.java |  10 +
 .../{TypeStrategy.java => StateTypeStrategy.java}  |  19 +-
 .../types/inference/StateTypeStrategyWrapper.java  |  66 +++++
 .../table/types/inference/StaticArgument.java      | 165 +++++++++++
 .../table/types/inference/StaticArgumentTrait.java |  67 +++++
 .../flink/table/types/inference/TypeInference.java | 302 ++++++++++++++-------
 .../flink/table/types/inference/TypeStrategy.java  |   3 +-
 .../extraction/TypeInferenceExtractorTest.java     |  16 +-
 .../types/inference/InputTypeStrategiesTest.java   |   2 +-
 10 files changed, 538 insertions(+), 138 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q 
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index 270ad9ff1f1..4cc0c043dc2 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -406,7 +406,7 @@ describe function extended temp_upperudf;
 |              requirements |                                         
$VAR_UDF_JAR_PATH_SPACE [] |
 |          is deterministic |                                       
$VAR_UDF_JAR_PATH_SPACE true |
 | supports constant folding |                                       
$VAR_UDF_JAR_PATH_SPACE true |
-|                 signature |                $VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(STRING) |
+|                 signature |        $VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(arg0 => STRING) |
 
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
 10 rows in set
 !ok
@@ -437,7 +437,7 @@ desc function extended temp_upperudf;
 |              requirements |                                         
$VAR_UDF_JAR_PATH_SPACE [] |
 |          is deterministic |                                       
$VAR_UDF_JAR_PATH_SPACE true |
 | supports constant folding |                                       
$VAR_UDF_JAR_PATH_SPACE true |
-|                 signature |                $VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(STRING) |
+|                 signature |        $VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(arg0 => STRING) |
 
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
 10 rows in set
 !ok
@@ -489,16 +489,16 @@ describe function `c1`.`db`.temp_upperudf;
 !ok
 
 describe function extended temp_upperudf;
-+---------------------------+-----------------------+
-|                 info name |            info value |
-+---------------------------+-----------------------+
-|        is system function |                  true |
-|              is temporary |                  true |
-|                      kind |                SCALAR |
-|              requirements |                    [] |
-|          is deterministic |                  true |
-| supports constant folding |                  true |
-|                 signature | temp_upperudf(STRING) |
-+---------------------------+-----------------------+
++---------------------------+-------------------------------+
+|                 info name |                    info value |
++---------------------------+-------------------------------+
+|        is system function |                          true |
+|              is temporary |                          true |
+|                      kind |                        SCALAR |
+|              requirements |                            [] |
+|          is deterministic |                          true |
+| supports constant folding |                          true |
+|                 signature | temp_upperudf(arg0 => STRING) |
++---------------------------+-------------------------------+
 7 rows in set
 !ok
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
index fce8c4664fa..1f6a195ee45 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.StaticArgument;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategy;
 
@@ -292,11 +293,20 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
             return this;
         }
 
+        public Builder staticArguments(StaticArgument... staticArguments) {
+            this.typeInferenceBuilder.staticArguments(staticArguments);
+            return this;
+        }
+
+        /** @deprecated Use {@link #staticArguments(StaticArgument...)} 
instead. */
+        @Deprecated
         public Builder namedArguments(String... argumentNames) {
             
this.typeInferenceBuilder.namedArguments(Arrays.asList(argumentNames));
             return this;
         }
 
+        /** @deprecated Use {@link #staticArguments(StaticArgument...)} 
instead. */
+        @Deprecated
         public Builder typedArguments(DataType... argumentTypes) {
             
this.typeInferenceBuilder.typedArguments(Arrays.asList(argumentTypes));
             return this;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
similarity index 63%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
index bdb6299f1f5..ae817054f0b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
@@ -19,22 +19,9 @@
 package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.types.DataType;
 
-import java.util.Optional;
-
-/**
- * Strategy for inferring the data type of a function call. The inferred type 
might describe the
- * final result or an intermediate result (accumulation type) of a function.
- *
- * <p>Note: Implementations should implement {@link Object#hashCode()} and 
{@link
- * Object#equals(Object)}.
- *
- * @see TypeStrategies
- */
+/** Strategy for inferring a function call's intermediate result data type 
(i.e. state entry). */
 @PublicEvolving
-public interface TypeStrategy {
-
-    /** Infers a type from the given function call. */
-    Optional<DataType> inferType(CallContext callContext);
+public interface StateTypeStrategy extends TypeStrategy {
+    // marker interface which will be filled with additional contracts in the 
future
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
new file mode 100644
index 00000000000..10d6b3b9f7c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/** A helper class that wraps a {@link TypeStrategy} into a {@link 
StateTypeStrategy}. */
+@PublicEvolving
+public class StateTypeStrategyWrapper implements StateTypeStrategy {
+
+    private final TypeStrategy typeStrategy;
+
+    private StateTypeStrategyWrapper(TypeStrategy typeStrategy) {
+        this.typeStrategy =
+                Preconditions.checkNotNull(typeStrategy, "Type strategy must 
not be null.");
+    }
+
+    public static StateTypeStrategyWrapper of(TypeStrategy typeStrategy) {
+        return new StateTypeStrategyWrapper(typeStrategy);
+    }
+
+    @Override
+    public Optional<DataType> inferType(CallContext callContext) {
+        return typeStrategy.inferType(callContext);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof StateTypeStrategyWrapper) {
+            return Objects.equals(typeStrategy, ((StateTypeStrategyWrapper) 
o).typeStrategy);
+        }
+        if (o instanceof TypeStrategy) {
+            return Objects.equals(typeStrategy, o);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return typeStrategy.hashCode();
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
new file mode 100644
index 00000000000..ad791585c78
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.EnumSet;
+import java.util.Optional;
+
+/**
+ * Describes an argument in a static signature that is not overloaded and does 
not support varargs.
+ *
+ * <p>Static arguments are a special case of an input type strategy. While 
built-in functions often
+ * require advanced type inference strategies (taking data type families, 
common type constraints
+ * between arguments, customized validation), many functions are fine with a 
static signature.
+ * Static arguments power these basic use cases.
+ *
+ * <p>Static arguments can take tables, models, or scalar values. Each 
argument takes a set of
+ * {@link StaticArgumentTrait} that enable basic validation by the framework.
+ */
+@PublicEvolving
+public class StaticArgument {
+
+    private final String name;
+    private final @Nullable DataType dataType;
+    private final @Nullable Class<?> conversionClass;
+    private final boolean isOptional;
+    private final EnumSet<StaticArgumentTrait> traits;
+
+    private StaticArgument(
+            String name,
+            @Nullable DataType dataType,
+            @Nullable Class<?> conversionClass,
+            boolean isOptional,
+            EnumSet<StaticArgumentTrait> traits) {
+        StaticArgumentTrait.checkIntegrity(
+                Preconditions.checkNotNull(traits, "Traits must not be 
null."));
+        this.name = Preconditions.checkNotNull(name, "Name must not be null.");
+        this.dataType = dataType;
+        this.conversionClass = conversionClass;
+        this.isOptional = isOptional;
+        this.traits = traits;
+    }
+
+    /**
+     * Declares a scalar argument such as {@code f(12)} or {@code 
f(otherColumn)}.
+     *
+     * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+     * @param dataType explicit type to which the argument is cast if necessary
+     * @param isOptional whether the argument is optional, if optional the 
corresponding data type
+     *     must be nullable
+     */
+    public static StaticArgument scalar(String name, DataType dataType, 
boolean isOptional) {
+        Preconditions.checkNotNull(dataType, "Data type must not be null.");
+        return new StaticArgument(
+                name, dataType, null, isOptional, 
EnumSet.of(StaticArgumentTrait.SCALAR));
+    }
+
+    /**
+     * Declares a table argument such as {@code f(t => myTable)} or {@code f(t 
=> TABLE myTable))}.
+     *
+     * <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} 
(default) or {@link
+     * StaticArgumentTrait#TABLE_AS_SET} semantics.
+     *
+     * <p>By only providing a conversion class, the argument supports a 
"polymorphic" behavior. In
+     * other words: it accepts tables with an arbitrary number of columns with 
arbitrary data types.
+     * For this case, a class satisfying {@link 
RowType#supportsOutputConversion(Class)} must be
+     * used.
+     *
+     * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+     * @param conversionClass a class satisfying {@link 
RowType#supportsOutputConversion(Class)}
+     * @param isOptional whether the argument is optional
+     * @param traits set of {@link StaticArgumentTrait} requiring {@link 
StaticArgumentTrait#TABLE}
+     */
+    public static StaticArgument table(
+            String name,
+            Class<?> conversionClass,
+            boolean isOptional,
+            EnumSet<StaticArgumentTrait> traits) {
+        Preconditions.checkNotNull(conversionClass, "Conversion class must not 
be null.");
+        final EnumSet<StaticArgumentTrait> enrichedTraits = 
EnumSet.copyOf(traits);
+        enrichedTraits.add(StaticArgumentTrait.TABLE);
+        if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
+            enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
+        }
+        return new StaticArgument(name, null, conversionClass, isOptional, 
enrichedTraits);
+    }
+
+    /**
+     * Declares a table argument such as {@code f(t => myTable)} or {@code f(t 
=> TABLE myTable))}.
+     *
+     * <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW} 
(default) or {@link
+     * StaticArgumentTrait#TABLE_AS_SET} semantics.
+     *
+     * <p>By providing a concrete data type, the argument only accepts tables 
with corresponding
+     * number of columns and data types. The data type must be a {@link 
RowType} or {@link
+     * StructuredType}.
+     *
+     * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+     * @param dataType explicit type to which the argument is cast if necessary
+     * @param isOptional whether the argument is optional, if optional the 
corresponding data type
+     *     must be nullable
+     * @param traits set of {@link StaticArgumentTrait} requiring {@link 
StaticArgumentTrait#TABLE}
+     */
+    public static StaticArgument table(
+            String name,
+            DataType dataType,
+            boolean isOptional,
+            EnumSet<StaticArgumentTrait> traits) {
+        Preconditions.checkNotNull(dataType, "Data type must not be null.");
+        return new StaticArgument(name, dataType, null, isOptional, 
enrichTableTraits(traits));
+    }
+
+    private static EnumSet<StaticArgumentTrait> enrichTableTraits(
+            EnumSet<StaticArgumentTrait> traits) {
+        final EnumSet<StaticArgumentTrait> enrichedTraits = 
EnumSet.copyOf(traits);
+        enrichedTraits.add(StaticArgumentTrait.TABLE);
+        if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
+            enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
+        }
+        return enrichedTraits;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Optional<DataType> getDataType() {
+        return Optional.ofNullable(dataType);
+    }
+
+    public Optional<Class<?>> getConversionClass() {
+        return Optional.ofNullable(conversionClass);
+    }
+
+    public boolean isOptional() {
+        return isOptional;
+    }
+
+    public EnumSet<StaticArgumentTrait> getTraits() {
+        return traits;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
new file mode 100644
index 00000000000..76a4e6e2690
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Declares traits for {@link StaticArgument}. They enable basic validation by 
the framework.
+ *
+ * <p>Some traits have dependencies to other traits, which is why this enum 
reflects a hierarchy in
+ * which {@link #SCALAR}, {@link #TABLE}, and {@link #MODEL} are the top-level 
roots.
+ */
+@PublicEvolving
+public enum StaticArgumentTrait {
+    SCALAR(),
+    TABLE(),
+    MODEL(),
+    TABLE_AS_ROW(TABLE),
+    TABLE_AS_SET(TABLE),
+    OPTIONAL_PARTITION_BY(TABLE_AS_SET);
+
+    private final Set<StaticArgumentTrait> requirements;
+
+    StaticArgumentTrait(StaticArgumentTrait... requirements) {
+        this.requirements = 
Arrays.stream(requirements).collect(Collectors.toSet());
+    }
+
+    public static void checkIntegrity(EnumSet<StaticArgumentTrait> traits) {
+        if (traits.stream().filter(t -> t.requirements.isEmpty()).count() != 
1) {
+            throw new ValidationException(
+                    "Invalid argument traits. An argument must be declared as 
either scalar, table, or model.");
+        }
+        traits.forEach(
+                trait ->
+                        trait.requirements.forEach(
+                                requirement -> {
+                                    if (!traits.contains(requirement)) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "Invalid argument 
traits. Trait %s requires %s.",
+                                                        trait, requirement));
+                                    }
+                                }));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
index e7bdaf77a93..f302036fb57 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
@@ -25,17 +25,20 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Provides logic for the type inference of function calls. It includes:
  *
  * <ul>
- *   <li>explicit input specification for (possibly named and/or typed) 
arguments
+ *   <li>explicit input specification for static arguments
  *   <li>inference of missing or incomplete input types
  *   <li>validation of input types
- *   <li>inference of an intermediate accumulation type
+ *   <li>inference of intermediate result types (i.e. state entries)
  *   <li>inference of the final output type
  * </ul>
  *
@@ -44,39 +47,20 @@ import java.util.Optional;
 @PublicEvolving
 public final class TypeInference {
 
-    private final @Nullable List<String> namedArguments;
-
-    private final @Nullable List<Boolean> optionalArguments;
-
-    private final @Nullable List<DataType> typedArguments;
-
+    private final @Nullable List<StaticArgument> staticArguments;
     private final InputTypeStrategy inputTypeStrategy;
-
-    private final @Nullable TypeStrategy accumulatorTypeStrategy;
-
+    private final LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies;
     private final TypeStrategy outputTypeStrategy;
 
     private TypeInference(
-            @Nullable List<String> namedArguments,
-            @Nullable List<Boolean> optionalArguments,
-            @Nullable List<DataType> typedArguments,
+            @Nullable List<StaticArgument> staticArguments,
             InputTypeStrategy inputTypeStrategy,
-            @Nullable TypeStrategy accumulatorTypeStrategy,
+            LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies,
             TypeStrategy outputTypeStrategy) {
-        this.namedArguments = namedArguments;
-        this.optionalArguments = optionalArguments;
-        this.typedArguments = typedArguments;
+        this.staticArguments = staticArguments;
         this.inputTypeStrategy = inputTypeStrategy;
-        this.accumulatorTypeStrategy = accumulatorTypeStrategy;
+        this.stateTypeStrategies = stateTypeStrategies;
         this.outputTypeStrategy = outputTypeStrategy;
-        if (namedArguments != null
-                && typedArguments != null
-                && namedArguments.size() != typedArguments.size()) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Mismatch between typed arguments %d and named 
argument %d.",
-                            namedArguments.size(), typedArguments.size()));
-        }
     }
 
     /** Builder for configuring and creating instances of {@link 
TypeInference}. */
@@ -84,50 +68,177 @@ public final class TypeInference {
         return new TypeInference.Builder();
     }
 
-    public Optional<List<String>> getNamedArguments() {
-        return Optional.ofNullable(namedArguments);
-    }
-
-    public Optional<List<DataType>> getTypedArguments() {
-        return Optional.ofNullable(typedArguments);
-    }
-
-    public Optional<List<Boolean>> getOptionalArguments() {
-        return Optional.ofNullable(optionalArguments);
+    public Optional<List<StaticArgument>> getStaticArguments() {
+        return Optional.ofNullable(staticArguments);
     }
 
     public InputTypeStrategy getInputTypeStrategy() {
         return inputTypeStrategy;
     }
 
-    public Optional<TypeStrategy> getAccumulatorTypeStrategy() {
-        return Optional.ofNullable(accumulatorTypeStrategy);
+    public LinkedHashMap<String, StateTypeStrategy> getStateTypeStrategies() {
+        return stateTypeStrategies;
     }
 
     public TypeStrategy getOutputTypeStrategy() {
         return outputTypeStrategy;
     }
 
+    /** @deprecated Use {@link #getStaticArguments()} instead. */
+    @Deprecated
+    public Optional<List<String>> getNamedArguments() {
+        return Optional.ofNullable(staticArguments)
+                .map(
+                        args ->
+                                args.stream()
+                                        .map(StaticArgument::getName)
+                                        .collect(Collectors.toList()));
+    }
+
+    /** @deprecated Use {@link #getStaticArguments()} instead. */
+    @Deprecated
+    public Optional<List<DataType>> getTypedArguments() {
+        return Optional.ofNullable(staticArguments)
+                .map(
+                        args ->
+                                args.stream()
+                                        .map(
+                                                arg ->
+                                                        arg.getDataType()
+                                                                .orElseThrow(
+                                                                        () ->
+                                                                               
 new IllegalArgumentException(
+                                                                               
         "Scalar argument with a data type expected.")))
+                                        .collect(Collectors.toList()));
+    }
+
+    /** @deprecated Use {@link #getStaticArguments()} instead. */
+    @Deprecated
+    public Optional<List<Boolean>> getOptionalArguments() {
+        return Optional.ofNullable(staticArguments)
+                .map(
+                        args ->
+                                args.stream()
+                                        .map(StaticArgument::isOptional)
+                                        .collect(Collectors.toList()));
+    }
+
+    /** @deprecated Use {@link #getStateTypeStrategies()} instead. */
+    @Deprecated
+    public Optional<TypeStrategy> getAccumulatorTypeStrategy() {
+        if (stateTypeStrategies.isEmpty()) {
+            return Optional.empty();
+        }
+        if (stateTypeStrategies.size() != 1) {
+            throw new IllegalArgumentException(
+                    "An accumulator should contain exactly one state type 
strategy.");
+        }
+        return Optional.of(stateTypeStrategies.values().iterator().next());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Builder
     // 
--------------------------------------------------------------------------------------------
 
     /** Builder for configuring and creating instances of {@link 
TypeInference}. */
     @PublicEvolving
     public static class Builder {
 
-        private @Nullable List<String> namedArguments;
+        private @Nullable List<StaticArgument> staticArguments;
+        private InputTypeStrategy inputTypeStrategy = 
InputTypeStrategies.WILDCARD;
+        private LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies =
+                new LinkedHashMap<>();
+        private @Nullable TypeStrategy outputTypeStrategy;
 
+        // Legacy
+        private @Nullable List<String> namedArguments;
         private @Nullable List<Boolean> optionalArguments;
-
         private @Nullable List<DataType> typedArguments;
 
-        private InputTypeStrategy inputTypeStrategy = 
InputTypeStrategies.WILDCARD;
+        public Builder() {
+            // default constructor to allow a fluent definition
+        }
 
-        private @Nullable TypeStrategy accumulatorTypeStrategy;
+        /**
+         * Sets a list of arguments in a static signature that is not 
overloaded and does not
+         * support varargs.
+         *
+         * <p>Static arguments are a special case of an input type strategy 
and takes precedence. A
+         * signature can take tables, models, or scalar values. It allows 
optional and/or named
+         * argument like {@code f(myArg => 12)}.
+         */
+        public Builder staticArguments(StaticArgument... staticArguments) {
+            this.staticArguments = Arrays.asList(staticArguments);
+            return this;
+        }
 
-        private @Nullable TypeStrategy outputTypeStrategy;
+        /**
+         * Sets a list of arguments in a static signature that is not 
overloaded and does not
+         * support varargs.
+         *
+         * <p>Static arguments are a special case of an input type strategy 
and takes precedence. A
+         * signature can take tables, models, or scalar values. It allows 
optional and/or named
+         * argument like {@code f(myArg => 12)}.
+         */
+        public Builder staticArguments(List<StaticArgument> staticArgument) {
+            this.staticArguments = staticArgument;
+            return this;
+        }
 
-        public Builder() {
-            // default constructor to allow a fluent definition
+        /**
+         * Sets the strategy for inferring and validating input arguments in a 
function call.
+         *
+         * <p>A {@link InputTypeStrategies#WILDCARD} strategy function is 
assumed by default.
+         */
+        public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) {
+            this.inputTypeStrategy =
+                    Preconditions.checkNotNull(
+                            inputTypeStrategy, "Input type strategy must not 
be null.");
+            return this;
+        }
+
+        /**
+         * Sets the strategy for inferring the intermediate accumulator data 
type of an aggregate
+         * function call.
+         */
+        public Builder accumulatorTypeStrategy(TypeStrategy 
accumulatorTypeStrategy) {
+            Preconditions.checkNotNull(
+                    accumulatorTypeStrategy, "Accumulator type strategy must 
not be null.");
+            this.stateTypeStrategies.put(
+                    "acc", 
StateTypeStrategyWrapper.of(accumulatorTypeStrategy));
+            return this;
+        }
+
+        /**
+         * Sets a map of state names to {@link StateTypeStrategy}s for 
inferring a function call's
+         * intermediate result data types (i.e. state entries). For aggregate 
functions, only one
+         * entry is allowed which defines the accumulator's data type.
+         */
+        public Builder stateTypeStrategies(
+                LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies) {
+            this.stateTypeStrategies = stateTypeStrategies;
+            return this;
+        }
+
+        /**
+         * Sets the strategy for inferring the final output data type of a 
function call.
+         *
+         * <p>Required.
+         */
+        public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
+            this.outputTypeStrategy =
+                    Preconditions.checkNotNull(
+                            outputTypeStrategy, "Output type strategy must not 
be null.");
+            return this;
+        }
+
+        public TypeInference build() {
+            return new TypeInference(
+                    createStaticArguments(),
+                    inputTypeStrategy,
+                    stateTypeStrategies,
+                    Preconditions.checkNotNull(
+                            outputTypeStrategy, "Output type strategy must not 
be null."));
         }
 
         /**
@@ -137,7 +248,10 @@ public final class TypeInference {
          * <p>This information is useful for SQL's concept of named arguments 
using the assignment
          * operator (e.g. {@code FUNC(max => 42)}). The names are used for 
reordering the call's
          * arguments to the formal argument order of the function.
+         *
+         * @deprecated Use {@link #staticArguments(List)} instead.
          */
+        @Deprecated
         public Builder namedArguments(List<String> argumentNames) {
             this.namedArguments =
                     Preconditions.checkNotNull(
@@ -145,7 +259,11 @@ public final class TypeInference {
             return this;
         }
 
-        /** @see #namedArguments(List) */
+        /**
+         * @see #namedArguments(List)
+         * @deprecated Use {@link #staticArguments(StaticArgument...)} instead.
+         */
+        @Deprecated
         public Builder namedArguments(String... argumentNames) {
             return namedArguments(Arrays.asList(argumentNames));
         }
@@ -157,7 +275,10 @@ public final class TypeInference {
          * <p>This information is useful for SQL's concept of named arguments 
using the assignment
          * operator. The optionals are used to determine whether an argument 
is optional or required
          * in the function call.
+         *
+         * @deprecated Use {@link #staticArguments(List)} instead.
          */
+        @Deprecated
         public Builder optionalArguments(List<Boolean> optionalArguments) {
             this.optionalArguments =
                     Preconditions.checkNotNull(
@@ -171,8 +292,10 @@ public final class TypeInference {
          *
          * <p>This information is useful for optional arguments with default 
value. In particular,
          * the number of arguments that need to be filled with a default value 
and their types is
-         * important.
+         *
+         * @deprecated Use {@link #staticArguments(List)} instead.
          */
+        @Deprecated
         public Builder typedArguments(List<DataType> argumentTypes) {
             this.typedArguments =
                     Preconditions.checkNotNull(
@@ -180,55 +303,48 @@ public final class TypeInference {
             return this;
         }
 
-        /** @see #typedArguments(List) */
-        public Builder typedArguments(DataType... argumentTypes) {
-            return typedArguments(Arrays.asList(argumentTypes));
-        }
-
-        /**
-         * Sets the strategy for inferring and validating input arguments in a 
function call.
-         *
-         * <p>A {@link InputTypeStrategies#WILDCARD} strategy function is 
assumed by default.
-         */
-        public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) {
-            this.inputTypeStrategy =
-                    Preconditions.checkNotNull(
-                            inputTypeStrategy, "Input type strategy must not 
be null.");
-            return this;
-        }
-
-        /**
-         * Sets the strategy for inferring the intermediate accumulator data 
type of a function
-         * call.
-         */
-        public Builder accumulatorTypeStrategy(TypeStrategy 
accumulatorTypeStrategy) {
-            this.accumulatorTypeStrategy =
-                    Preconditions.checkNotNull(
-                            accumulatorTypeStrategy, "Accumulator type 
strategy must not be null.");
-            return this;
-        }
-
         /**
-         * Sets the strategy for inferring the final output data type of a 
function call.
-         *
-         * <p>Required.
+         * @see #typedArguments(List)
+         * @deprecated Use {@link #staticArguments(StaticArgument...)} instead.
          */
-        public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
-            this.outputTypeStrategy =
-                    Preconditions.checkNotNull(
-                            outputTypeStrategy, "Output type strategy must not 
be null.");
-            return this;
+        @Deprecated
+        public Builder typedArguments(DataType... argumentTypes) {
+            return typedArguments(Arrays.asList(argumentTypes));
         }
 
-        public TypeInference build() {
-            return new TypeInference(
-                    namedArguments,
-                    optionalArguments,
-                    typedArguments,
-                    inputTypeStrategy,
-                    accumulatorTypeStrategy,
-                    Preconditions.checkNotNull(
-                            outputTypeStrategy, "Output type strategy must not 
be null."));
+        private @Nullable List<StaticArgument> createStaticArguments() {
+            if (staticArguments != null) {
+                return staticArguments;
+            }
+            // Legacy path
+            if (typedArguments != null) {
+                if (namedArguments != null && namedArguments.size() != 
typedArguments.size()) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Mismatch between typed arguments %d and 
named arguments %d.",
+                                    typedArguments.size(), 
namedArguments.size()));
+                }
+                if (optionalArguments != null
+                        && optionalArguments.size() != typedArguments.size()) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Mismatch between typed arguments %d and 
optional arguments %d.",
+                                    typedArguments.size(), 
optionalArguments.size()));
+                }
+                return IntStream.range(0, typedArguments.size())
+                        .mapToObj(
+                                pos ->
+                                        StaticArgument.scalar(
+                                                
Optional.ofNullable(namedArguments)
+                                                        .map(args -> 
args.get(pos))
+                                                        .orElse("arg" + pos),
+                                                typedArguments.get(pos),
+                                                
Optional.ofNullable(optionalArguments)
+                                                        .map(args -> 
args.get(pos))
+                                                        .orElse(false)))
+                        .collect(Collectors.toList());
+            }
+            return null;
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
index bdb6299f1f5..5b7924e1653 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
@@ -24,8 +24,7 @@ import org.apache.flink.table.types.DataType;
 import java.util.Optional;
 
 /**
- * Strategy for inferring the data type of a function call. The inferred type 
might describe the
- * final result or an intermediate result (accumulation type) of a function.
+ * Strategy for inferring a function call's result data type.
  *
  * <p>Note: Implementations should implement {@link Object#hashCode()} and 
{@link
  * Object#equals(Object)}.
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
index dc3b36c9320..a438ef86cc0 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
@@ -433,7 +433,6 @@ class TypeInferenceExtractorTest {
 
                 // scalar function that takes any input
                 TestSpec.forScalarFunction(InputGroupScalarFunction.class)
-                        .expectNamedArguments("o")
                         .expectOutputMapping(
                                 InputTypeStrategies.sequence(
                                         new String[] {"o"},
@@ -557,6 +556,7 @@ class TypeInferenceExtractorTest {
                 TestSpec.forScalarFunction(
                                 "Scalar function with arguments hints all 
missing name",
                                 ArgumentHintMissingNameScalarFunction.class)
+                        .expectNamedArguments("arg0", "arg1")
                         .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT()),
                 TestSpec.forScalarFunction(
                                 "Scalar function with arguments hints all 
missing partial name",
@@ -695,7 +695,6 @@ class TypeInferenceExtractorTest {
                                 InputTypeStrategies.sequence(
                                         
InputTypeStrategies.explicit(DataTypes.BIGINT())),
                                 TypeStrategies.explicit(DataTypes.INT())),
-
                 // no arguments
                 TestSpec.forProcedure(ZeroArgProcedure.class)
                         .expectNamedArguments()
@@ -704,7 +703,6 @@ class TypeInferenceExtractorTest {
                                 InputTypeStrategies.sequence(
                                         new String[0], new 
ArgumentTypeStrategy[0]),
                                 TypeStrategies.explicit(DataTypes.INT())),
-
                 // test primitive arguments extraction
                 TestSpec.forProcedure(MixedArgProcedure.class)
                         .expectNamedArguments("i", "d")
@@ -719,7 +717,6 @@ class TypeInferenceExtractorTest {
                                             
InputTypeStrategies.explicit(DataTypes.DOUBLE())
                                         }),
                                 TypeStrategies.explicit(DataTypes.INT())),
-
                 // test overloaded arguments extraction
                 TestSpec.forProcedure(OverloadedProcedure.class)
                         .expectOutputMapping(
@@ -739,7 +736,6 @@ class TypeInferenceExtractorTest {
                                         }),
                                 TypeStrategies.explicit(
                                         
DataTypes.BIGINT().notNull().bridgedTo(long.class))),
-
                 // test varying arguments extraction
                 TestSpec.forProcedure(VarArgProcedure.class)
                         .expectOutputMapping(
@@ -752,7 +748,6 @@ class TypeInferenceExtractorTest {
                                                     
DataTypes.INT().notNull().bridgedTo(int.class))
                                         }),
                                 TypeStrategies.explicit(DataTypes.STRING())),
-
                 // test varying arguments extraction with byte
                 TestSpec.forProcedure(VarArgWithByteProcedure.class)
                         .expectOutputMapping(
@@ -765,7 +760,6 @@ class TypeInferenceExtractorTest {
                                                             
.bridgedTo(byte.class))
                                         }),
                                 TypeStrategies.explicit(DataTypes.STRING())),
-
                 // output hint with input extraction
                 TestSpec.forProcedure(ExtractWithOutputHintProcedure.class)
                         .expectNamedArguments("i")
@@ -777,7 +771,6 @@ class TypeInferenceExtractorTest {
                                             
InputTypeStrategies.explicit(DataTypes.INT())
                                         }),
                                 TypeStrategies.explicit(DataTypes.INT())),
-
                 // output extraction with input hints
                 TestSpec.forProcedure(ExtractWithInputHintProcedure.class)
                         .expectNamedArguments("i", "b")
@@ -794,17 +787,14 @@ class TypeInferenceExtractorTest {
                 // named arguments with overloaded function
                 // expected no named argument for overloaded function
                 TestSpec.forProcedure(NamedArgumentsProcedure.class),
-
-                // scalar function that takes any input
+                // procedure function that takes any input
                 TestSpec.forProcedure(InputGroupProcedure.class)
-                        .expectNamedArguments("o")
                         .expectOutputMapping(
                                 InputTypeStrategies.sequence(
                                         new String[] {"o"},
                                         new ArgumentTypeStrategy[] 
{InputTypeStrategies.ANY}),
                                 TypeStrategies.explicit(DataTypes.STRING())),
-
-                // scalar function that takes any input as vararg
+                // procedure function that takes any input as vararg
                 TestSpec.forProcedure(VarArgInputGroupProcedure.class)
                         .expectOutputMapping(
                                 InputTypeStrategies.varyingSequence(
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 9a63dbb0cb4..d49e376107f 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -423,7 +423,7 @@ class InputTypeStrategiesTest extends 
InputTypeStrategiesTestBase {
                 TestSpec.forStrategy(WILDCARD)
                         .typedArguments(DataTypes.INT(), DataTypes.STRING())
                         .calledWithArgumentTypes(DataTypes.TINYINT(), 
DataTypes.STRING())
-                        .expectSignature("f(INT, STRING)")
+                        .expectSignature("f(arg0 => INT, arg1 => STRING)")
                         .expectArgumentTypes(DataTypes.INT(), 
DataTypes.STRING()),
 
                 // invalid typed arguments


Reply via email to