This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3ac109b0b4f [FLINK-36704][table-common] Update TypeInference with
StaticArgument and StateTypeStrategy
3ac109b0b4f is described below
commit 3ac109b0b4faea35bd58bd16e7c0657b80be65ac
Author: Timo Walther <[email protected]>
AuthorDate: Mon Nov 25 16:09:47 2024 +0100
[FLINK-36704][table-common] Update TypeInference with StaticArgument and
StateTypeStrategy
---
.../src/test/resources/sql/function.q | 26 +-
.../table/functions/BuiltInFunctionDefinition.java | 10 +
.../{TypeStrategy.java => StateTypeStrategy.java} | 19 +-
.../types/inference/StateTypeStrategyWrapper.java | 66 +++++
.../table/types/inference/StaticArgument.java | 165 +++++++++++
.../table/types/inference/StaticArgumentTrait.java | 67 +++++
.../flink/table/types/inference/TypeInference.java | 302 ++++++++++++++-------
.../flink/table/types/inference/TypeStrategy.java | 3 +-
.../extraction/TypeInferenceExtractorTest.java | 16 +-
.../types/inference/InputTypeStrategiesTest.java | 2 +-
10 files changed, 538 insertions(+), 138 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index 270ad9ff1f1..4cc0c043dc2 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -406,7 +406,7 @@ describe function extended temp_upperudf;
| requirements |
$VAR_UDF_JAR_PATH_SPACE [] |
| is deterministic |
$VAR_UDF_JAR_PATH_SPACE true |
| supports constant folding |
$VAR_UDF_JAR_PATH_SPACE true |
-| signature | $VAR_UDF_JAR_PATH_SPACE
c1.db.temp_upperudf(STRING) |
+| signature | $VAR_UDF_JAR_PATH_SPACE
c1.db.temp_upperudf(arg0 => STRING) |
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
10 rows in set
!ok
@@ -437,7 +437,7 @@ desc function extended temp_upperudf;
| requirements |
$VAR_UDF_JAR_PATH_SPACE [] |
| is deterministic |
$VAR_UDF_JAR_PATH_SPACE true |
| supports constant folding |
$VAR_UDF_JAR_PATH_SPACE true |
-| signature | $VAR_UDF_JAR_PATH_SPACE
c1.db.temp_upperudf(STRING) |
+| signature | $VAR_UDF_JAR_PATH_SPACE
c1.db.temp_upperudf(arg0 => STRING) |
+---------------------------+---------------------------------------------$VAR_UDF_JAR_PATH_DASH+
10 rows in set
!ok
@@ -489,16 +489,16 @@ describe function `c1`.`db`.temp_upperudf;
!ok
describe function extended temp_upperudf;
-+---------------------------+-----------------------+
-| info name | info value |
-+---------------------------+-----------------------+
-| is system function | true |
-| is temporary | true |
-| kind | SCALAR |
-| requirements | [] |
-| is deterministic | true |
-| supports constant folding | true |
-| signature | temp_upperudf(STRING) |
-+---------------------------+-----------------------+
++---------------------------+-------------------------------+
+| info name | info value |
++---------------------------+-------------------------------+
+| is system function | true |
+| is temporary | true |
+| kind | SCALAR |
+| requirements | [] |
+| is deterministic | true |
+| supports constant folding | true |
+| signature | temp_upperudf(arg0 => STRING) |
++---------------------------+-------------------------------+
7 rows in set
!ok
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
index fce8c4664fa..1f6a195ee45 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;
@@ -292,11 +293,20 @@ public final class BuiltInFunctionDefinition implements
SpecializedFunction {
return this;
}
+ public Builder staticArguments(StaticArgument... staticArguments) {
+ this.typeInferenceBuilder.staticArguments(staticArguments);
+ return this;
+ }
+
+ /** @deprecated Use {@link #staticArguments(StaticArgument...)}
instead. */
+ @Deprecated
public Builder namedArguments(String... argumentNames) {
this.typeInferenceBuilder.namedArguments(Arrays.asList(argumentNames));
return this;
}
+ /** @deprecated Use {@link #staticArguments(StaticArgument...)}
instead. */
+ @Deprecated
public Builder typedArguments(DataType... argumentTypes) {
this.typeInferenceBuilder.typedArguments(Arrays.asList(argumentTypes));
return this;
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
similarity index 63%
copy from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
index bdb6299f1f5..ae817054f0b 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategy.java
@@ -19,22 +19,9 @@
package org.apache.flink.table.types.inference;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.types.DataType;
-import java.util.Optional;
-
-/**
- * Strategy for inferring the data type of a function call. The inferred type
might describe the
- * final result or an intermediate result (accumulation type) of a function.
- *
- * <p>Note: Implementations should implement {@link Object#hashCode()} and
{@link
- * Object#equals(Object)}.
- *
- * @see TypeStrategies
- */
+/** Strategy for inferring a function call's intermediate result data type
(i.e. state entry). */
@PublicEvolving
-public interface TypeStrategy {
-
- /** Infers a type from the given function call. */
- Optional<DataType> inferType(CallContext callContext);
+public interface StateTypeStrategy extends TypeStrategy {
+ // marker interface which will be filled with additional contracts in the
future
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
new file mode 100644
index 00000000000..10d6b3b9f7c
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StateTypeStrategyWrapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/** A helper class that wraps a {@link TypeStrategy} into a {@link
StateTypeStrategy}. */
+@PublicEvolving
+public class StateTypeStrategyWrapper implements StateTypeStrategy {
+
+ private final TypeStrategy typeStrategy;
+
+ private StateTypeStrategyWrapper(TypeStrategy typeStrategy) {
+ this.typeStrategy =
+ Preconditions.checkNotNull(typeStrategy, "Type strategy must
not be null.");
+ }
+
+ public static StateTypeStrategyWrapper of(TypeStrategy typeStrategy) {
+ return new StateTypeStrategyWrapper(typeStrategy);
+ }
+
+ @Override
+ public Optional<DataType> inferType(CallContext callContext) {
+ return typeStrategy.inferType(callContext);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof StateTypeStrategyWrapper) {
+ return Objects.equals(typeStrategy, ((StateTypeStrategyWrapper)
o).typeStrategy);
+ }
+ if (o instanceof TypeStrategy) {
+ return Objects.equals(typeStrategy, o);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeStrategy.hashCode();
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
new file mode 100644
index 00000000000..ad791585c78
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.EnumSet;
+import java.util.Optional;
+
+/**
+ * Describes an argument in a static signature that is not overloaded and does
not support varargs.
+ *
+ * <p>Static arguments are a special case of an input type strategy. While
built-in functions often
+ * require advanced type inference strategies (taking data type families,
common type constraints
+ * between arguments, customized validation), many functions are fine with a
static signature.
+ * Static arguments power these basic use cases.
+ *
+ * <p>Static arguments can take tables, models, or scalar values. Each
argument takes a set of
+ * {@link StaticArgumentTrait} that enable basic validation by the framework.
+ */
+@PublicEvolving
+public class StaticArgument {
+
+ private final String name;
+ private final @Nullable DataType dataType;
+ private final @Nullable Class<?> conversionClass;
+ private final boolean isOptional;
+ private final EnumSet<StaticArgumentTrait> traits;
+
+ private StaticArgument(
+ String name,
+ @Nullable DataType dataType,
+ @Nullable Class<?> conversionClass,
+ boolean isOptional,
+ EnumSet<StaticArgumentTrait> traits) {
+ StaticArgumentTrait.checkIntegrity(
+ Preconditions.checkNotNull(traits, "Traits must not be
null."));
+ this.name = Preconditions.checkNotNull(name, "Name must not be null.");
+ this.dataType = dataType;
+ this.conversionClass = conversionClass;
+ this.isOptional = isOptional;
+ this.traits = traits;
+ }
+
+ /**
+ * Declares a scalar argument such as {@code f(12)} or {@code
f(otherColumn)}.
+ *
+ * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+ * @param dataType explicit type to which the argument is cast if necessary
+ * @param isOptional whether the argument is optional, if optional the
corresponding data type
+ * must be nullable
+ */
+ public static StaticArgument scalar(String name, DataType dataType,
boolean isOptional) {
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ return new StaticArgument(
+ name, dataType, null, isOptional,
EnumSet.of(StaticArgumentTrait.SCALAR));
+ }
+
+ /**
+ * Declares a table argument such as {@code f(t => myTable)} or {@code f(t
=> TABLE myTable))}.
+ *
+ * <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW}
(default) or {@link
+ * StaticArgumentTrait#TABLE_AS_SET} semantics.
+ *
+ * <p>By only providing a conversion class, the argument supports a
"polymorphic" behavior. In
+ * other words: it accepts tables with an arbitrary number of columns with
arbitrary data types.
+ * For this case, a class satisfying {@link
RowType#supportsOutputConversion(Class)} must be
+ * used.
+ *
+ * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+ * @param conversionClass a class satisfying {@link
RowType#supportsOutputConversion(Class)}
+ * @param isOptional whether the argument is optional
+ * @param traits set of {@link StaticArgumentTrait} requiring {@link
StaticArgumentTrait#TABLE}
+ */
+ public static StaticArgument table(
+ String name,
+ Class<?> conversionClass,
+ boolean isOptional,
+ EnumSet<StaticArgumentTrait> traits) {
+ Preconditions.checkNotNull(conversionClass, "Conversion class must not
be null.");
+ final EnumSet<StaticArgumentTrait> enrichedTraits =
EnumSet.copyOf(traits);
+ enrichedTraits.add(StaticArgumentTrait.TABLE);
+ if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
+ enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
+ }
+ return new StaticArgument(name, null, conversionClass, isOptional,
enrichedTraits);
+ }
+
+ /**
+ * Declares a table argument such as {@code f(t => myTable)} or {@code f(t
=> TABLE myTable))}.
+ *
+ * <p>The argument can have {@link StaticArgumentTrait#TABLE_AS_ROW}
(default) or {@link
+ * StaticArgumentTrait#TABLE_AS_SET} semantics.
+ *
+ * <p>By providing a concrete data type, the argument only accepts tables
with corresponding
+ * number of columns and data types. The data type must be a {@link
RowType} or {@link
+ * StructuredType}.
+ *
+ * @param name name for the assignment operator e.g. {@code f(myArg => 12)}
+ * @param dataType explicit type to which the argument is cast if necessary
+ * @param isOptional whether the argument is optional, if optional the
corresponding data type
+ * must be nullable
+ * @param traits set of {@link StaticArgumentTrait} requiring {@link
StaticArgumentTrait#TABLE}
+ */
+ public static StaticArgument table(
+ String name,
+ DataType dataType,
+ boolean isOptional,
+ EnumSet<StaticArgumentTrait> traits) {
+ Preconditions.checkNotNull(dataType, "Data type must not be null.");
+ return new StaticArgument(name, dataType, null, isOptional,
enrichTableTraits(traits));
+ }
+
+ private static EnumSet<StaticArgumentTrait> enrichTableTraits(
+ EnumSet<StaticArgumentTrait> traits) {
+ final EnumSet<StaticArgumentTrait> enrichedTraits =
EnumSet.copyOf(traits);
+ enrichedTraits.add(StaticArgumentTrait.TABLE);
+ if (!enrichedTraits.contains(StaticArgumentTrait.TABLE_AS_SET)) {
+ enrichedTraits.add(StaticArgumentTrait.TABLE_AS_ROW);
+ }
+ return enrichedTraits;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Optional<DataType> getDataType() {
+ return Optional.ofNullable(dataType);
+ }
+
+ public Optional<Class<?>> getConversionClass() {
+ return Optional.ofNullable(conversionClass);
+ }
+
+ public boolean isOptional() {
+ return isOptional;
+ }
+
+ public EnumSet<StaticArgumentTrait> getTraits() {
+ return traits;
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
new file mode 100644
index 00000000000..76a4e6e2690
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Declares traits for {@link StaticArgument}. They enable basic validation by
the framework.
+ *
+ * <p>Some traits have dependencies to other traits, which is why this enum
reflects a hierarchy in
+ * which {@link #SCALAR}, {@link #TABLE}, and {@link #MODEL} are the top-level
roots.
+ */
+@PublicEvolving
+public enum StaticArgumentTrait {
+ SCALAR(),
+ TABLE(),
+ MODEL(),
+ TABLE_AS_ROW(TABLE),
+ TABLE_AS_SET(TABLE),
+ OPTIONAL_PARTITION_BY(TABLE_AS_SET);
+
+ private final Set<StaticArgumentTrait> requirements;
+
+ StaticArgumentTrait(StaticArgumentTrait... requirements) {
+ this.requirements =
Arrays.stream(requirements).collect(Collectors.toSet());
+ }
+
+ public static void checkIntegrity(EnumSet<StaticArgumentTrait> traits) {
+ if (traits.stream().filter(t -> t.requirements.isEmpty()).count() !=
1) {
+ throw new ValidationException(
+ "Invalid argument traits. An argument must be declared as
either scalar, table, or model.");
+ }
+ traits.forEach(
+ trait ->
+ trait.requirements.forEach(
+ requirement -> {
+ if (!traits.contains(requirement)) {
+ throw new ValidationException(
+ String.format(
+ "Invalid argument
traits. Trait %s requires %s.",
+ trait, requirement));
+ }
+ }));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
index e7bdaf77a93..f302036fb57 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
@@ -25,17 +25,20 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Provides logic for the type inference of function calls. It includes:
*
* <ul>
- * <li>explicit input specification for (possibly named and/or typed)
arguments
+ * <li>explicit input specification for static arguments
* <li>inference of missing or incomplete input types
* <li>validation of input types
- * <li>inference of an intermediate accumulation type
+ * <li>inference of intermediate result types (i.e. state entries)
* <li>inference of the final output type
* </ul>
*
@@ -44,39 +47,20 @@ import java.util.Optional;
@PublicEvolving
public final class TypeInference {
- private final @Nullable List<String> namedArguments;
-
- private final @Nullable List<Boolean> optionalArguments;
-
- private final @Nullable List<DataType> typedArguments;
-
+ private final @Nullable List<StaticArgument> staticArguments;
private final InputTypeStrategy inputTypeStrategy;
-
- private final @Nullable TypeStrategy accumulatorTypeStrategy;
-
+ private final LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies;
private final TypeStrategy outputTypeStrategy;
private TypeInference(
- @Nullable List<String> namedArguments,
- @Nullable List<Boolean> optionalArguments,
- @Nullable List<DataType> typedArguments,
+ @Nullable List<StaticArgument> staticArguments,
InputTypeStrategy inputTypeStrategy,
- @Nullable TypeStrategy accumulatorTypeStrategy,
+ LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies,
TypeStrategy outputTypeStrategy) {
- this.namedArguments = namedArguments;
- this.optionalArguments = optionalArguments;
- this.typedArguments = typedArguments;
+ this.staticArguments = staticArguments;
this.inputTypeStrategy = inputTypeStrategy;
- this.accumulatorTypeStrategy = accumulatorTypeStrategy;
+ this.stateTypeStrategies = stateTypeStrategies;
this.outputTypeStrategy = outputTypeStrategy;
- if (namedArguments != null
- && typedArguments != null
- && namedArguments.size() != typedArguments.size()) {
- throw new IllegalArgumentException(
- String.format(
- "Mismatch between typed arguments %d and named
argument %d.",
- namedArguments.size(), typedArguments.size()));
- }
}
/** Builder for configuring and creating instances of {@link
TypeInference}. */
@@ -84,50 +68,177 @@ public final class TypeInference {
return new TypeInference.Builder();
}
- public Optional<List<String>> getNamedArguments() {
- return Optional.ofNullable(namedArguments);
- }
-
- public Optional<List<DataType>> getTypedArguments() {
- return Optional.ofNullable(typedArguments);
- }
-
- public Optional<List<Boolean>> getOptionalArguments() {
- return Optional.ofNullable(optionalArguments);
+ public Optional<List<StaticArgument>> getStaticArguments() {
+ return Optional.ofNullable(staticArguments);
}
public InputTypeStrategy getInputTypeStrategy() {
return inputTypeStrategy;
}
- public Optional<TypeStrategy> getAccumulatorTypeStrategy() {
- return Optional.ofNullable(accumulatorTypeStrategy);
+ public LinkedHashMap<String, StateTypeStrategy> getStateTypeStrategies() {
+ return stateTypeStrategies;
}
public TypeStrategy getOutputTypeStrategy() {
return outputTypeStrategy;
}
+ /** @deprecated Use {@link #getStaticArguments()} instead. */
+ @Deprecated
+ public Optional<List<String>> getNamedArguments() {
+ return Optional.ofNullable(staticArguments)
+ .map(
+ args ->
+ args.stream()
+ .map(StaticArgument::getName)
+ .collect(Collectors.toList()));
+ }
+
+ /** @deprecated Use {@link #getStaticArguments()} instead. */
+ @Deprecated
+ public Optional<List<DataType>> getTypedArguments() {
+ return Optional.ofNullable(staticArguments)
+ .map(
+ args ->
+ args.stream()
+ .map(
+ arg ->
+ arg.getDataType()
+ .orElseThrow(
+ () ->
+
new IllegalArgumentException(
+
"Scalar argument with a data type expected.")))
+ .collect(Collectors.toList()));
+ }
+
+ /** @deprecated Use {@link #getStaticArguments()} instead. */
+ @Deprecated
+ public Optional<List<Boolean>> getOptionalArguments() {
+ return Optional.ofNullable(staticArguments)
+ .map(
+ args ->
+ args.stream()
+ .map(StaticArgument::isOptional)
+ .collect(Collectors.toList()));
+ }
+
+ /** @deprecated Use {@link #getStateTypeStrategies()} instead. */
+ @Deprecated
+ public Optional<TypeStrategy> getAccumulatorTypeStrategy() {
+ if (stateTypeStrategies.isEmpty()) {
+ return Optional.empty();
+ }
+ if (stateTypeStrategies.size() != 1) {
+ throw new IllegalArgumentException(
+ "An accumulator should contain exactly one state type
strategy.");
+ }
+ return Optional.of(stateTypeStrategies.values().iterator().next());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Builder
//
--------------------------------------------------------------------------------------------
/** Builder for configuring and creating instances of {@link
TypeInference}. */
@PublicEvolving
public static class Builder {
- private @Nullable List<String> namedArguments;
+ private @Nullable List<StaticArgument> staticArguments;
+ private InputTypeStrategy inputTypeStrategy =
InputTypeStrategies.WILDCARD;
+ private LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies =
+ new LinkedHashMap<>();
+ private @Nullable TypeStrategy outputTypeStrategy;
+ // Legacy
+ private @Nullable List<String> namedArguments;
private @Nullable List<Boolean> optionalArguments;
-
private @Nullable List<DataType> typedArguments;
- private InputTypeStrategy inputTypeStrategy =
InputTypeStrategies.WILDCARD;
+ public Builder() {
+ // default constructor to allow a fluent definition
+ }
- private @Nullable TypeStrategy accumulatorTypeStrategy;
+ /**
+ * Sets a list of arguments in a static signature that is not
overloaded and does not
+ * support varargs.
+ *
+ * <p>Static arguments are a special case of an input type strategy
and takes precedence. A
+ * signature can take tables, models, or scalar values. It allows
optional and/or named
+ * argument like {@code f(myArg => 12)}.
+ */
+ public Builder staticArguments(StaticArgument... staticArguments) {
+ this.staticArguments = Arrays.asList(staticArguments);
+ return this;
+ }
- private @Nullable TypeStrategy outputTypeStrategy;
+ /**
+ * Sets a list of arguments in a static signature that is not
overloaded and does not
+ * support varargs.
+ *
+ * <p>Static arguments are a special case of an input type strategy
and takes precedence. A
+ * signature can take tables, models, or scalar values. It allows
optional and/or named
+ * argument like {@code f(myArg => 12)}.
+ */
+ public Builder staticArguments(List<StaticArgument> staticArgument) {
+ this.staticArguments = staticArgument;
+ return this;
+ }
- public Builder() {
- // default constructor to allow a fluent definition
+ /**
+ * Sets the strategy for inferring and validating input arguments in a
function call.
+ *
+ * <p>A {@link InputTypeStrategies#WILDCARD} strategy function is
assumed by default.
+ */
+ public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) {
+ this.inputTypeStrategy =
+ Preconditions.checkNotNull(
+ inputTypeStrategy, "Input type strategy must not
be null.");
+ return this;
+ }
+
+ /**
+ * Sets the strategy for inferring the intermediate accumulator data
type of an aggregate
+ * function call.
+ */
+ public Builder accumulatorTypeStrategy(TypeStrategy
accumulatorTypeStrategy) {
+ Preconditions.checkNotNull(
+ accumulatorTypeStrategy, "Accumulator type strategy must
not be null.");
+ this.stateTypeStrategies.put(
+ "acc",
StateTypeStrategyWrapper.of(accumulatorTypeStrategy));
+ return this;
+ }
+
+ /**
+ * Sets a map of state names to {@link StateTypeStrategy}s for
inferring a function call's
+ * intermediate result data types (i.e. state entries). For aggregate
functions, only one
+ * entry is allowed which defines the accumulator's data type.
+ */
+ public Builder stateTypeStrategies(
+ LinkedHashMap<String, StateTypeStrategy> stateTypeStrategies) {
+ this.stateTypeStrategies = stateTypeStrategies;
+ return this;
+ }
+
+ /**
+ * Sets the strategy for inferring the final output data type of a
function call.
+ *
+ * <p>Required.
+ */
+ public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
+ this.outputTypeStrategy =
+ Preconditions.checkNotNull(
+ outputTypeStrategy, "Output type strategy must not
be null.");
+ return this;
+ }
+
+ public TypeInference build() {
+ return new TypeInference(
+ createStaticArguments(),
+ inputTypeStrategy,
+ stateTypeStrategies,
+ Preconditions.checkNotNull(
+ outputTypeStrategy, "Output type strategy must not
be null."));
}
/**
@@ -137,7 +248,10 @@ public final class TypeInference {
* <p>This information is useful for SQL's concept of named arguments
using the assignment
* operator (e.g. {@code FUNC(max => 42)}). The names are used for
reordering the call's
* arguments to the formal argument order of the function.
+ *
+ * @deprecated Use {@link #staticArguments(List)} instead.
*/
+ @Deprecated
public Builder namedArguments(List<String> argumentNames) {
this.namedArguments =
Preconditions.checkNotNull(
@@ -145,7 +259,11 @@ public final class TypeInference {
return this;
}
- /** @see #namedArguments(List) */
+ /**
+ * @see #namedArguments(List)
+ * @deprecated Use {@link #staticArguments(StaticArgument...)} instead.
+ */
+ @Deprecated
public Builder namedArguments(String... argumentNames) {
return namedArguments(Arrays.asList(argumentNames));
}
@@ -157,7 +275,10 @@ public final class TypeInference {
* <p>This information is useful for SQL's concept of named arguments
using the assignment
* operator. The optionals are used to determine whether an argument
is optional or required
* in the function call.
+ *
+ * @deprecated Use {@link #staticArguments(List)} instead.
*/
+ @Deprecated
public Builder optionalArguments(List<Boolean> optionalArguments) {
this.optionalArguments =
Preconditions.checkNotNull(
@@ -171,8 +292,10 @@ public final class TypeInference {
*
* <p>This information is useful for optional arguments with default
value. In particular,
* the number of arguments that need to be filled with a default value
and their types is
- * important.
+ *
+ * @deprecated Use {@link #staticArguments(List)} instead.
*/
+ @Deprecated
public Builder typedArguments(List<DataType> argumentTypes) {
this.typedArguments =
Preconditions.checkNotNull(
@@ -180,55 +303,48 @@ public final class TypeInference {
return this;
}
- /** @see #typedArguments(List) */
- public Builder typedArguments(DataType... argumentTypes) {
- return typedArguments(Arrays.asList(argumentTypes));
- }
-
- /**
- * Sets the strategy for inferring and validating input arguments in a
function call.
- *
- * <p>A {@link InputTypeStrategies#WILDCARD} strategy function is
assumed by default.
- */
- public Builder inputTypeStrategy(InputTypeStrategy inputTypeStrategy) {
- this.inputTypeStrategy =
- Preconditions.checkNotNull(
- inputTypeStrategy, "Input type strategy must not
be null.");
- return this;
- }
-
- /**
- * Sets the strategy for inferring the intermediate accumulator data
type of a function
- * call.
- */
- public Builder accumulatorTypeStrategy(TypeStrategy
accumulatorTypeStrategy) {
- this.accumulatorTypeStrategy =
- Preconditions.checkNotNull(
- accumulatorTypeStrategy, "Accumulator type
strategy must not be null.");
- return this;
- }
-
/**
- * Sets the strategy for inferring the final output data type of a
function call.
- *
- * <p>Required.
+ * @see #typedArguments(List)
+ * @deprecated Use {@link #staticArguments(StaticArgument...)} instead.
*/
- public Builder outputTypeStrategy(TypeStrategy outputTypeStrategy) {
- this.outputTypeStrategy =
- Preconditions.checkNotNull(
- outputTypeStrategy, "Output type strategy must not
be null.");
- return this;
+ @Deprecated
+ public Builder typedArguments(DataType... argumentTypes) {
+ return typedArguments(Arrays.asList(argumentTypes));
}
- public TypeInference build() {
- return new TypeInference(
- namedArguments,
- optionalArguments,
- typedArguments,
- inputTypeStrategy,
- accumulatorTypeStrategy,
- Preconditions.checkNotNull(
- outputTypeStrategy, "Output type strategy must not
be null."));
+ private @Nullable List<StaticArgument> createStaticArguments() {
+ if (staticArguments != null) {
+ return staticArguments;
+ }
+ // Legacy path
+ if (typedArguments != null) {
+ if (namedArguments != null && namedArguments.size() !=
typedArguments.size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Mismatch between typed arguments %d and
named arguments %d.",
+ typedArguments.size(),
namedArguments.size()));
+ }
+ if (optionalArguments != null
+ && optionalArguments.size() != typedArguments.size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Mismatch between typed arguments %d and
optional arguments %d.",
+ typedArguments.size(),
optionalArguments.size()));
+ }
+ return IntStream.range(0, typedArguments.size())
+ .mapToObj(
+ pos ->
+ StaticArgument.scalar(
+
Optional.ofNullable(namedArguments)
+ .map(args ->
args.get(pos))
+ .orElse("arg" + pos),
+ typedArguments.get(pos),
+
Optional.ofNullable(optionalArguments)
+ .map(args ->
args.get(pos))
+ .orElse(false)))
+ .collect(Collectors.toList());
+ }
+ return null;
}
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
index bdb6299f1f5..5b7924e1653 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategy.java
@@ -24,8 +24,7 @@ import org.apache.flink.table.types.DataType;
import java.util.Optional;
/**
- * Strategy for inferring the data type of a function call. The inferred type
might describe the
- * final result or an intermediate result (accumulation type) of a function.
+ * Strategy for inferring a function call's result data type.
*
* <p>Note: Implementations should implement {@link Object#hashCode()} and
{@link
* Object#equals(Object)}.
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
index dc3b36c9320..a438ef86cc0 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
@@ -433,7 +433,6 @@ class TypeInferenceExtractorTest {
// scalar function that takes any input
TestSpec.forScalarFunction(InputGroupScalarFunction.class)
- .expectNamedArguments("o")
.expectOutputMapping(
InputTypeStrategies.sequence(
new String[] {"o"},
@@ -557,6 +556,7 @@ class TypeInferenceExtractorTest {
TestSpec.forScalarFunction(
"Scalar function with arguments hints all
missing name",
ArgumentHintMissingNameScalarFunction.class)
+ .expectNamedArguments("arg0", "arg1")
.expectTypedArguments(DataTypes.STRING(),
DataTypes.INT()),
TestSpec.forScalarFunction(
"Scalar function with arguments hints all
missing partial name",
@@ -695,7 +695,6 @@ class TypeInferenceExtractorTest {
InputTypeStrategies.sequence(
InputTypeStrategies.explicit(DataTypes.BIGINT())),
TypeStrategies.explicit(DataTypes.INT())),
-
// no arguments
TestSpec.forProcedure(ZeroArgProcedure.class)
.expectNamedArguments()
@@ -704,7 +703,6 @@ class TypeInferenceExtractorTest {
InputTypeStrategies.sequence(
new String[0], new
ArgumentTypeStrategy[0]),
TypeStrategies.explicit(DataTypes.INT())),
-
// test primitive arguments extraction
TestSpec.forProcedure(MixedArgProcedure.class)
.expectNamedArguments("i", "d")
@@ -719,7 +717,6 @@ class TypeInferenceExtractorTest {
InputTypeStrategies.explicit(DataTypes.DOUBLE())
}),
TypeStrategies.explicit(DataTypes.INT())),
-
// test overloaded arguments extraction
TestSpec.forProcedure(OverloadedProcedure.class)
.expectOutputMapping(
@@ -739,7 +736,6 @@ class TypeInferenceExtractorTest {
}),
TypeStrategies.explicit(
DataTypes.BIGINT().notNull().bridgedTo(long.class))),
-
// test varying arguments extraction
TestSpec.forProcedure(VarArgProcedure.class)
.expectOutputMapping(
@@ -752,7 +748,6 @@ class TypeInferenceExtractorTest {
DataTypes.INT().notNull().bridgedTo(int.class))
}),
TypeStrategies.explicit(DataTypes.STRING())),
-
// test varying arguments extraction with byte
TestSpec.forProcedure(VarArgWithByteProcedure.class)
.expectOutputMapping(
@@ -765,7 +760,6 @@ class TypeInferenceExtractorTest {
.bridgedTo(byte.class))
}),
TypeStrategies.explicit(DataTypes.STRING())),
-
// output hint with input extraction
TestSpec.forProcedure(ExtractWithOutputHintProcedure.class)
.expectNamedArguments("i")
@@ -777,7 +771,6 @@ class TypeInferenceExtractorTest {
InputTypeStrategies.explicit(DataTypes.INT())
}),
TypeStrategies.explicit(DataTypes.INT())),
-
// output extraction with input hints
TestSpec.forProcedure(ExtractWithInputHintProcedure.class)
.expectNamedArguments("i", "b")
@@ -794,17 +787,14 @@ class TypeInferenceExtractorTest {
// named arguments with overloaded function
// expected no named argument for overloaded function
TestSpec.forProcedure(NamedArgumentsProcedure.class),
-
- // scalar function that takes any input
+ // procedure function that takes any input
TestSpec.forProcedure(InputGroupProcedure.class)
- .expectNamedArguments("o")
.expectOutputMapping(
InputTypeStrategies.sequence(
new String[] {"o"},
new ArgumentTypeStrategy[]
{InputTypeStrategies.ANY}),
TypeStrategies.explicit(DataTypes.STRING())),
-
- // scalar function that takes any input as vararg
+ // procedure function that takes any input as vararg
TestSpec.forProcedure(VarArgInputGroupProcedure.class)
.expectOutputMapping(
InputTypeStrategies.varyingSequence(
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 9a63dbb0cb4..d49e376107f 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -423,7 +423,7 @@ class InputTypeStrategiesTest extends
InputTypeStrategiesTestBase {
TestSpec.forStrategy(WILDCARD)
.typedArguments(DataTypes.INT(), DataTypes.STRING())
.calledWithArgumentTypes(DataTypes.TINYINT(),
DataTypes.STRING())
- .expectSignature("f(INT, STRING)")
+ .expectSignature("f(arg0 => INT, arg1 => STRING)")
.expectArgumentTypes(DataTypes.INT(),
DataTypes.STRING()),
// invalid typed arguments