twalthr commented on a change in pull request #12410:
URL: https://github.com/apache/flink/pull/12410#discussion_r434557424
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -23,37 +23,47 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.table.types.utils.TypeConversions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
- * {@link InputTypeStrategy} specific for {@link
org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
- *
- * <p>It expects at least one argument. All the arguments must have a common
super type.
+ * An {@link InputTypeStrategy} that expects that all arguments have a common
type.
*/
@Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public class SameArgumentsInputTypeStrategy implements InputTypeStrategy {
Review comment:
nit: call it `CommonInputTypeStrategy`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -63,13 +73,40 @@ public ArgumentCount getArgumentCount() {
.collect(Collectors.toList())
);
+ if (!commonType.isPresent()) {
+ throw callContext.newValidationError("Could not find a
common type for arguments: %s", argumentDataTypes);
+ }
+
return commonType.map(type -> Collections.nCopies(
argumentDataTypes.size(),
TypeConversions.fromLogicalToDataType(type)));
}
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
- return
Collections.singletonList(Signature.of(Signature.Argument.of("*")));
+ Optional<Integer> minCount = argumentCount.getMinCount();
+ Optional<Integer> maxCount = argumentCount.getMaxCount();
+
+ int numberOfMandatoryArguments = 0;
Review comment:
nit: I'm a fan of immutable variables, can't we just do:
```
argumentCount.getMinCount().orElse(0)
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
+ this.argumentsSplits = argumentsSplits;
+ this.argumentCount = argumentCount;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext,
+ boolean throwOnFailure) {
+
+ List<DataType> result = new ArrayList<>();
+
+ for (ArgumentsSplit argumentsSplit : argumentsSplits) {
+ Optional<List<DataType>> splitDataTypes =
argumentsSplit.inputTypeStrategy
+ .inferInputTypes(new
SplitCallContext(argumentsSplit, callContext), throwOnFailure);
+
+ if (splitDataTypes.isPresent()) {
+ result.addAll(splitDataTypes.get());
+ } else {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Could not infer arguments in
range: [%d, %d].",
+ argumentsSplit.startIndex,
+ argumentsSplit.endIndex);
+ }
+ return Optional.empty();
+ }
+ }
+
+ return Optional.of(result);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return argumentsSplits.stream()
+ .map(computeSubSignatures(definition))
+ .reduce(this::crossJoin)
+ .get()
+ .stream().map(Signature::of)
+ .collect(Collectors.toList());
+ }
+
+ private Function<ArgumentsSplit, List<List<Signature.Argument>>>
computeSubSignatures(
+ FunctionDefinition definition) {
+ return argumentsSplit ->
argumentsSplit.inputTypeStrategy.getExpectedSignatures(definition)
+ .stream()
+ .map(Signature::getArguments)
+ .collect(Collectors.toList());
+ }
+
+ private List<List<Signature.Argument>> crossJoin(
+ List<List<Signature.Argument>> signatureStarts,
+ List<List<Signature.Argument>> signatureEnds) {
+ return signatureStarts.stream()
+ .flatMap(x ->
+ signatureEnds.stream().map(y -> {
+ ArrayList<Signature.Argument> joined = new
ArrayList<>(x);
+ joined.addAll(y);
+ return joined;
+ }))
+ .collect(Collectors.toList());
+ }
+
+ private static final class ArgumentsSplit {
+ private final int startIndex;
+ private final @Nullable Integer endIndex;
+
+ private final InputTypeStrategy inputTypeStrategy;
+
+ public ArgumentsSplit(
+ int startIndex,
+ Integer endIndex,
+ InputTypeStrategy inputTypeStrategy) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.inputTypeStrategy = inputTypeStrategy;
+ }
+ }
+
+ private static final class SplitCallContext implements CallContext {
+
+ private final ArgumentsSplit split;
+ private final CallContext originalCallContext;
+
+ private SplitCallContext(
+ ArgumentsSplit split,
+ CallContext originalCallContext) {
+ this.split = split;
+ this.originalCallContext = originalCallContext;
+ }
+
+ @Override
+ public DataTypeFactory getDataTypeFactory() {
+ return originalCallContext.getDataTypeFactory();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return originalCallContext.getFunctionDefinition();
+ }
+
+ @Override
+ public boolean isArgumentLiteral(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public boolean isArgumentNull(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public <T> Optional<T> getArgumentValue(int pos, Class<T>
clazz) {
+ return originalCallContext.getArgumentValue(pos +
split.startIndex, clazz);
+ }
+
+ @Override
+ public String getName() {
+ return originalCallContext.getName();
+ }
+
+ @Override
+ public List<DataType> getArgumentDataTypes() {
+ List<DataType> originalArgumentDataTypes =
originalCallContext.getArgumentDataTypes();
+ return originalArgumentDataTypes.subList(
+ split.startIndex,
+ split.endIndex != null ? split.endIndex :
originalArgumentDataTypes.size());
+ }
+
+ @Override
+ public Optional<DataType> getOutputDataType() {
+ return originalCallContext.getOutputDataType();
+ }
+ }
+
+ /**
+ * A Builder for {@link SubSequenceInputTypeStrategy}.
+ */
+ @PublicEvolving
+ public static class SubSequenceStrategyBuilder {
+ private final List<ArgumentsSplit> argumentsSplits = new
ArrayList<>();
+ private int currentPos = 0;
+
+ /**
+ * Defines that we expect a single argument at the next
position.
+ */
+ public SubSequenceStrategyBuilder argument(ArgumentTypeStrategy
argumentTypeStrategy) {
+ SequenceInputTypeStrategy singleArgumentStrategy = new
SequenceInputTypeStrategy(
+
Collections.singletonList(argumentTypeStrategy), null);
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
currentPos + 1, singleArgumentStrategy));
+ currentPos += 1;
+ return this;
+ }
+
+ /**
+ * Defines a common {@link InputTypeStrategy} for the next
arguments. Given
+ * input strategy must expect a constant number of arguments.
That means that both
+ * the minimum and maximum number of arguments must be defined
and equal to each other.
+ *
+ * <p>If you need a varying logic use {@link
#finishWithvarying(InputTypeStrategy)}.
+ */
+ public SubSequenceStrategyBuilder subSequence(InputTypeStrategy
inputTypeStrategy) {
+ Optional<Integer> maxCount =
inputTypeStrategy.getArgumentCount().getMaxCount();
Review comment:
add a precondition to check for `ConstantArgumentCount`
##########
File path:
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/SubSequenceInputTypeStrategyTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.table.api.DataTypes;
+import
org.apache.flink.table.types.inference.strategies.SubSequenceInputTypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
+
+/**
+ * Tests for {@link SubSequenceInputTypeStrategy}
+ */
+public class SubSequenceInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static List<TestSpec> testData() {
+ return asList(
+ TestSpec
+ .forStrategy(
+ "A strategy used for IF ELSE with valid
arguments",
+ InputTypeStrategies.startSequences()
+
.argument(logical(LogicalTypeRoot.BOOLEAN))
+ .subSequence(commonType(2))
+ .finish()
+ )
+ .calledWithArgumentTypes(
+ DataTypes.BOOLEAN(),
+ DataTypes.SMALLINT(),
+ DataTypes.DECIMAL(10, 2)
+ )
+ .expectSignature(
+ "f(<BOOLEAN>, <COMMON>, <COMMON>)")
+ .expectArgumentTypes(
+ DataTypes.BOOLEAN(),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.DECIMAL(10, 2)
+ ),
+
+ TestSpec
+ .forStrategy(
+ "Strategy fails if any of the nested
strategies fail",
+ InputTypeStrategies.startSequences()
+
.argument(logical(LogicalTypeRoot.BOOLEAN))
+ .subSequence(commonType(2))
+ .finish()
+ )
+ .calledWithArgumentTypes(
+ DataTypes.BOOLEAN(),
+ DataTypes.VARCHAR(3),
+ DataTypes.DECIMAL(10, 2)
+ )
+ .expectErrorMessage("Could not find a common
type for arguments: [VARCHAR(3), DECIMAL(10, 2)]"),
+
+ TestSpec
+ .forStrategy(
+ "Strategy with a varying argument",
+ InputTypeStrategies.startSequences()
+
.argument(logical(LogicalTypeRoot.BOOLEAN))
+ .subSequence(commonType(2))
Review comment:
can we have one more sophisticated test where another subsequence and
another argument follows here?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
Review comment:
use `ConstantArgumentCount`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
+ this.argumentsSplits = argumentsSplits;
+ this.argumentCount = argumentCount;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext,
+ boolean throwOnFailure) {
+
+ List<DataType> result = new ArrayList<>();
+
+ for (ArgumentsSplit argumentsSplit : argumentsSplits) {
+ Optional<List<DataType>> splitDataTypes =
argumentsSplit.inputTypeStrategy
+ .inferInputTypes(new
SplitCallContext(argumentsSplit, callContext), throwOnFailure);
+
+ if (splitDataTypes.isPresent()) {
+ result.addAll(splitDataTypes.get());
+ } else {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Could not infer arguments in
range: [%d, %d].",
+ argumentsSplit.startIndex,
+ argumentsSplit.endIndex);
+ }
+ return Optional.empty();
+ }
+ }
+
+ return Optional.of(result);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return argumentsSplits.stream()
+ .map(computeSubSignatures(definition))
+ .reduce(this::crossJoin)
+ .get()
+ .stream().map(Signature::of)
+ .collect(Collectors.toList());
+ }
+
+ private Function<ArgumentsSplit, List<List<Signature.Argument>>>
computeSubSignatures(
+ FunctionDefinition definition) {
+ return argumentsSplit ->
argumentsSplit.inputTypeStrategy.getExpectedSignatures(definition)
+ .stream()
+ .map(Signature::getArguments)
+ .collect(Collectors.toList());
+ }
+
+ private List<List<Signature.Argument>> crossJoin(
+ List<List<Signature.Argument>> signatureStarts,
+ List<List<Signature.Argument>> signatureEnds) {
+ return signatureStarts.stream()
+ .flatMap(x ->
+ signatureEnds.stream().map(y -> {
+ ArrayList<Signature.Argument> joined = new
ArrayList<>(x);
+ joined.addAll(y);
+ return joined;
+ }))
+ .collect(Collectors.toList());
+ }
+
+ private static final class ArgumentsSplit {
+ private final int startIndex;
+ private final @Nullable Integer endIndex;
+
+ private final InputTypeStrategy inputTypeStrategy;
+
+ public ArgumentsSplit(
+ int startIndex,
+ Integer endIndex,
+ InputTypeStrategy inputTypeStrategy) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.inputTypeStrategy = inputTypeStrategy;
+ }
+ }
+
+ private static final class SplitCallContext implements CallContext {
+
+ private final ArgumentsSplit split;
+ private final CallContext originalCallContext;
+
+ private SplitCallContext(
+ ArgumentsSplit split,
+ CallContext originalCallContext) {
+ this.split = split;
+ this.originalCallContext = originalCallContext;
+ }
+
+ @Override
+ public DataTypeFactory getDataTypeFactory() {
+ return originalCallContext.getDataTypeFactory();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return originalCallContext.getFunctionDefinition();
+ }
+
+ @Override
+ public boolean isArgumentLiteral(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public boolean isArgumentNull(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public <T> Optional<T> getArgumentValue(int pos, Class<T>
clazz) {
+ return originalCallContext.getArgumentValue(pos +
split.startIndex, clazz);
+ }
+
+ @Override
+ public String getName() {
+ return originalCallContext.getName();
+ }
+
+ @Override
+ public List<DataType> getArgumentDataTypes() {
+ List<DataType> originalArgumentDataTypes =
originalCallContext.getArgumentDataTypes();
+ return originalArgumentDataTypes.subList(
+ split.startIndex,
+ split.endIndex != null ? split.endIndex :
originalArgumentDataTypes.size());
+ }
+
+ @Override
+ public Optional<DataType> getOutputDataType() {
+ return originalCallContext.getOutputDataType();
+ }
+ }
+
+ /**
+ * A Builder for {@link SubSequenceInputTypeStrategy}.
+ */
+ @PublicEvolving
+ public static class SubSequenceStrategyBuilder {
+ private final List<ArgumentsSplit> argumentsSplits = new
ArrayList<>();
+ private int currentPos = 0;
+
+ /**
+ * Defines that we expect a single argument at the next
position.
+ */
+ public SubSequenceStrategyBuilder argument(ArgumentTypeStrategy
argumentTypeStrategy) {
+ SequenceInputTypeStrategy singleArgumentStrategy = new
SequenceInputTypeStrategy(
+
Collections.singletonList(argumentTypeStrategy), null);
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
currentPos + 1, singleArgumentStrategy));
+ currentPos += 1;
+ return this;
+ }
+
+ /**
+ * Defines a common {@link InputTypeStrategy} for the next
arguments. Given
+ * input strategy must expect a constant number of arguments.
That means that both
+ * the minimum and maximum number of arguments must be defined
and equal to each other.
+ *
+ * <p>If you need a varying logic use {@link
#finishWithvarying(InputTypeStrategy)}.
+ */
+ public SubSequenceStrategyBuilder subSequence(InputTypeStrategy
inputTypeStrategy) {
+ Optional<Integer> maxCount =
inputTypeStrategy.getArgumentCount().getMaxCount();
+ Optional<Integer> minCount =
inputTypeStrategy.getArgumentCount().getMinCount();
+ if (!maxCount.isPresent() || !minCount.isPresent() ||
!maxCount.get().equals(minCount.get())) {
+ throw new IllegalArgumentException("Both the
minimum and maximum number of expected arguments must" +
+ " be defined and equal to each other.");
+ }
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
currentPos + maxCount.get(), inputTypeStrategy));
+ currentPos += maxCount.get();
+ return this;
+ }
+
+ /**
+ * Defines a common {@link InputTypeStrategy} for the next
arguments. Given
+ * input strategy must expect a varying number of arguments.
That means that the
+ * maximum number of arguments must not be defined.
+ */
+ public InputTypeStrategy finishWithvarying(InputTypeStrategy
inputTypeStrategy) {
Review comment:
`finishWithVarying `
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -23,37 +23,47 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.table.types.utils.TypeConversions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
- * {@link InputTypeStrategy} specific for {@link
org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
- *
- * <p>It expects at least one argument. All the arguments must have a common
super type.
+ * An {@link InputTypeStrategy} that expects that all arguments have a common
type.
*/
@Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public class SameArgumentsInputTypeStrategy implements InputTypeStrategy {
+ private static final Signature.Argument COMMON_ARGUMENT =
Signature.Argument.of("<COMMON>");
+ private final ArgumentCount argumentCount;
+
+ public SameArgumentsInputTypeStrategy(ArgumentCount argumentCount) {
+ this.argumentCount = argumentCount;
+ }
+
@Override
public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.from(1);
+ return argumentCount;
}
@Override
public Optional<List<DataType>> inferInputTypes(
- CallContext callContext,
- boolean throwOnFailure) {
+ CallContext callContext,
+ boolean throwOnFailure) {
List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
- if (argumentDataTypes.size() == 0) {
- return Optional.empty();
+
+ if (callContext.getArgumentDataTypes()
Review comment:
use the variable above
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -23,37 +23,47 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.table.types.utils.TypeConversions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
- * {@link InputTypeStrategy} specific for {@link
org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
- *
- * <p>It expects at least one argument. All the arguments must have a common
super type.
+ * An {@link InputTypeStrategy} that expects that all arguments have a common
type.
*/
@Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public class SameArgumentsInputTypeStrategy implements InputTypeStrategy {
+ private static final Signature.Argument COMMON_ARGUMENT =
Signature.Argument.of("<COMMON>");
+ private final ArgumentCount argumentCount;
+
+ public SameArgumentsInputTypeStrategy(ArgumentCount argumentCount) {
Review comment:
Use `ConstantArgumentCount`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
##########
@@ -61,6 +62,17 @@
*/
public static final WildcardInputTypeStrategy WILDCARD = new
WildcardInputTypeStrategy();
+ /**
+ * An strategy that lets you apply other strategies for subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most
of the cases. Use this strategy
Review comment:
nit: link to the static method instead
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
+ this.argumentsSplits = argumentsSplits;
+ this.argumentCount = argumentCount;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext,
+ boolean throwOnFailure) {
+
+ List<DataType> result = new ArrayList<>();
+
+ for (ArgumentsSplit argumentsSplit : argumentsSplits) {
+ Optional<List<DataType>> splitDataTypes =
argumentsSplit.inputTypeStrategy
+ .inferInputTypes(new
SplitCallContext(argumentsSplit, callContext), throwOnFailure);
+
+ if (splitDataTypes.isPresent()) {
+ result.addAll(splitDataTypes.get());
+ } else {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Could not infer arguments in
range: [%d, %d].",
+ argumentsSplit.startIndex,
+ argumentsSplit.endIndex);
+ }
+ return Optional.empty();
+ }
+ }
+
+ return Optional.of(result);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return argumentsSplits.stream()
+ .map(computeSubSignatures(definition))
+ .reduce(this::crossJoin)
+ .get()
+ .stream().map(Signature::of)
+ .collect(Collectors.toList());
+ }
+
+ private Function<ArgumentsSplit, List<List<Signature.Argument>>>
computeSubSignatures(
+ FunctionDefinition definition) {
+ return argumentsSplit ->
argumentsSplit.inputTypeStrategy.getExpectedSignatures(definition)
+ .stream()
+ .map(Signature::getArguments)
+ .collect(Collectors.toList());
+ }
+
+ private List<List<Signature.Argument>> crossJoin(
+ List<List<Signature.Argument>> signatureStarts,
+ List<List<Signature.Argument>> signatureEnds) {
+ return signatureStarts.stream()
+ .flatMap(x ->
+ signatureEnds.stream().map(y -> {
+ ArrayList<Signature.Argument> joined = new
ArrayList<>(x);
+ joined.addAll(y);
+ return joined;
+ }))
+ .collect(Collectors.toList());
+ }
+
+ private static final class ArgumentsSplit {
+ private final int startIndex;
+ private final @Nullable Integer endIndex;
+
+ private final InputTypeStrategy inputTypeStrategy;
+
+ public ArgumentsSplit(
+ int startIndex,
+ Integer endIndex,
+ InputTypeStrategy inputTypeStrategy) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.inputTypeStrategy = inputTypeStrategy;
+ }
+ }
+
+ private static final class SplitCallContext implements CallContext {
+
+ private final ArgumentsSplit split;
+ private final CallContext originalCallContext;
+
+ private SplitCallContext(
+ ArgumentsSplit split,
+ CallContext originalCallContext) {
+ this.split = split;
+ this.originalCallContext = originalCallContext;
+ }
+
+ @Override
+ public DataTypeFactory getDataTypeFactory() {
+ return originalCallContext.getDataTypeFactory();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return originalCallContext.getFunctionDefinition();
+ }
+
+ @Override
+ public boolean isArgumentLiteral(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public boolean isArgumentNull(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public <T> Optional<T> getArgumentValue(int pos, Class<T>
clazz) {
+ return originalCallContext.getArgumentValue(pos +
split.startIndex, clazz);
+ }
+
+ @Override
+ public String getName() {
+ return originalCallContext.getName();
+ }
+
+ @Override
+ public List<DataType> getArgumentDataTypes() {
+ List<DataType> originalArgumentDataTypes =
originalCallContext.getArgumentDataTypes();
+ return originalArgumentDataTypes.subList(
+ split.startIndex,
+ split.endIndex != null ? split.endIndex :
originalArgumentDataTypes.size());
+ }
+
+ @Override
+ public Optional<DataType> getOutputDataType() {
+ return originalCallContext.getOutputDataType();
+ }
+ }
+
+ /**
+ * A Builder for {@link SubSequenceInputTypeStrategy}.
+ */
+ @PublicEvolving
Review comment:
currently, all built-in strategies are internal
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
##########
@@ -61,6 +62,17 @@
*/
public static final WildcardInputTypeStrategy WILDCARD = new
WildcardInputTypeStrategy();
+ /**
+ * An strategy that lets you apply other strategies for subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most
of the cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the
arguments.
+ */
+ public static SubSequenceStrategyBuilder startSequences() {
Review comment:
shall we call it `patternSequence`? Because it reminds me of a regex or
something similar. Or `compositeSequence`? The name should indicate that this
is not the default goto method for sequences.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
##########
@@ -90,9 +91,8 @@ public static UnresolvedCallExpression isNull(Expression
input) {
return call(IS_NULL, input);
}
- public static UnresolvedCallExpression ifThenElse(Expression condition,
Expression ifTrue,
- Expression ifFalse) {
- return call(IF, condition, ifTrue, ifFalse);
+ public static UnresolvedCallExpression ifThenElse(Expression condition,
Expression ifTrue, Expression ifFalse) {
+ return call(IF, call(IS_TRUE, condition), ifTrue, ifFalse);
Review comment:
could you elaborate on this change? So if the condition is NULL we will
always execute the false branch?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -63,13 +73,40 @@ public ArgumentCount getArgumentCount() {
.collect(Collectors.toList())
);
+ if (!commonType.isPresent()) {
+ throw callContext.newValidationError("Could not find a
common type for arguments: %s", argumentDataTypes);
Review comment:
only throw if `throwOnFailure` is true
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -23,37 +23,47 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.table.types.utils.TypeConversions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
- * {@link InputTypeStrategy} specific for {@link
org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
- *
- * <p>It expects at least one argument. All the arguments must have a common
super type.
+ * An {@link InputTypeStrategy} that expects that all arguments have a common
type.
*/
@Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public class SameArgumentsInputTypeStrategy implements InputTypeStrategy {
+ private static final Signature.Argument COMMON_ARGUMENT =
Signature.Argument.of("<COMMON>");
+ private final ArgumentCount argumentCount;
+
+ public SameArgumentsInputTypeStrategy(ArgumentCount argumentCount) {
+ this.argumentCount = argumentCount;
+ }
+
@Override
public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.from(1);
+ return argumentCount;
}
@Override
public Optional<List<DataType>> inferInputTypes(
- CallContext callContext,
- boolean throwOnFailure) {
+ CallContext callContext,
+ boolean throwOnFailure) {
List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
- if (argumentDataTypes.size() == 0) {
- return Optional.empty();
+
+ if (callContext.getArgumentDataTypes()
+ .stream()
+ .anyMatch(dataType -> dataType.getLogicalType()
instanceof LegacyTypeInformationType)) {
Review comment:
convert to logical types at the beginning of this method to reuse them
here and below
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
+ this.argumentsSplits = argumentsSplits;
Review comment:
nit: add some `Precondition.checkNotNull`
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SameArgumentsInputTypeStrategy.java
##########
@@ -63,13 +73,40 @@ public ArgumentCount getArgumentCount() {
.collect(Collectors.toList())
);
+ if (!commonType.isPresent()) {
+ throw callContext.newValidationError("Could not find a
common type for arguments: %s", argumentDataTypes);
+ }
+
return commonType.map(type -> Collections.nCopies(
argumentDataTypes.size(),
TypeConversions.fromLogicalToDataType(type)));
}
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
- return
Collections.singletonList(Signature.of(Signature.Argument.of("*")));
+ Optional<Integer> minCount = argumentCount.getMinCount();
+ Optional<Integer> maxCount = argumentCount.getMaxCount();
+
+ int numberOfMandatoryArguments = 0;
+ if (minCount.isPresent()) {
+ numberOfMandatoryArguments = minCount.get();
+ }
+
+ if (maxCount.isPresent()) {
+ List<Signature> signatures = new ArrayList<>();
+ IntStream.range(numberOfMandatoryArguments,
maxCount.get() + 1)
Review comment:
nit: use mapToObj and collect?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubSequenceInputTypeStrategy.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that lets you apply other strategies for
subsequences of
+ * the actual arguments.
+ *
+ * <p>The {@link SequenceInputTypeStrategy} should be preferred in most of the
cases. Use this strategy
+ * only if you need to apply a common logic to a subsequence of the arguments.
+ */
+@Internal
+public final class SubSequenceInputTypeStrategy implements InputTypeStrategy {
+
+ private final List<ArgumentsSplit> argumentsSplits;
+ private final ArgumentCount argumentCount;
+
+ private SubSequenceInputTypeStrategy(List<ArgumentsSplit>
argumentsSplits, ArgumentCount argumentCount) {
+ this.argumentsSplits = argumentsSplits;
+ this.argumentCount = argumentCount;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext,
+ boolean throwOnFailure) {
+
+ List<DataType> result = new ArrayList<>();
+
+ for (ArgumentsSplit argumentsSplit : argumentsSplits) {
+ Optional<List<DataType>> splitDataTypes =
argumentsSplit.inputTypeStrategy
+ .inferInputTypes(new
SplitCallContext(argumentsSplit, callContext), throwOnFailure);
+
+ if (splitDataTypes.isPresent()) {
+ result.addAll(splitDataTypes.get());
+ } else {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Could not infer arguments in
range: [%d, %d].",
+ argumentsSplit.startIndex,
+ argumentsSplit.endIndex);
+ }
+ return Optional.empty();
+ }
+ }
+
+ return Optional.of(result);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return argumentsSplits.stream()
+ .map(computeSubSignatures(definition))
+ .reduce(this::crossJoin)
+ .get()
+ .stream().map(Signature::of)
+ .collect(Collectors.toList());
+ }
+
+ private Function<ArgumentsSplit, List<List<Signature.Argument>>>
computeSubSignatures(
+ FunctionDefinition definition) {
+ return argumentsSplit ->
argumentsSplit.inputTypeStrategy.getExpectedSignatures(definition)
+ .stream()
+ .map(Signature::getArguments)
+ .collect(Collectors.toList());
+ }
+
+ private List<List<Signature.Argument>> crossJoin(
+ List<List<Signature.Argument>> signatureStarts,
+ List<List<Signature.Argument>> signatureEnds) {
+ return signatureStarts.stream()
+ .flatMap(x ->
+ signatureEnds.stream().map(y -> {
+ ArrayList<Signature.Argument> joined = new
ArrayList<>(x);
+ joined.addAll(y);
+ return joined;
+ }))
+ .collect(Collectors.toList());
+ }
+
+ private static final class ArgumentsSplit {
+ private final int startIndex;
+ private final @Nullable Integer endIndex;
+
+ private final InputTypeStrategy inputTypeStrategy;
+
+ public ArgumentsSplit(
+ int startIndex,
+ Integer endIndex,
+ InputTypeStrategy inputTypeStrategy) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.inputTypeStrategy = inputTypeStrategy;
+ }
+ }
+
+ private static final class SplitCallContext implements CallContext {
+
+ private final ArgumentsSplit split;
+ private final CallContext originalCallContext;
+
+ private SplitCallContext(
+ ArgumentsSplit split,
+ CallContext originalCallContext) {
+ this.split = split;
+ this.originalCallContext = originalCallContext;
+ }
+
+ @Override
+ public DataTypeFactory getDataTypeFactory() {
+ return originalCallContext.getDataTypeFactory();
+ }
+
+ @Override
+ public FunctionDefinition getFunctionDefinition() {
+ return originalCallContext.getFunctionDefinition();
+ }
+
+ @Override
+ public boolean isArgumentLiteral(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public boolean isArgumentNull(int pos) {
+ return originalCallContext.isArgumentLiteral(pos +
split.startIndex);
+ }
+
+ @Override
+ public <T> Optional<T> getArgumentValue(int pos, Class<T>
clazz) {
+ return originalCallContext.getArgumentValue(pos +
split.startIndex, clazz);
+ }
+
+ @Override
+ public String getName() {
+ return originalCallContext.getName();
+ }
+
+ @Override
+ public List<DataType> getArgumentDataTypes() {
+ List<DataType> originalArgumentDataTypes =
originalCallContext.getArgumentDataTypes();
+ return originalArgumentDataTypes.subList(
+ split.startIndex,
+ split.endIndex != null ? split.endIndex :
originalArgumentDataTypes.size());
+ }
+
+ @Override
+ public Optional<DataType> getOutputDataType() {
+ return originalCallContext.getOutputDataType();
+ }
+ }
+
+ /**
+ * A Builder for {@link SubSequenceInputTypeStrategy}.
+ */
+ @PublicEvolving
+ public static class SubSequenceStrategyBuilder {
+ private final List<ArgumentsSplit> argumentsSplits = new
ArrayList<>();
+ private int currentPos = 0;
+
+ /**
+ * Defines that we expect a single argument at the next
position.
+ */
+ public SubSequenceStrategyBuilder argument(ArgumentTypeStrategy
argumentTypeStrategy) {
+ SequenceInputTypeStrategy singleArgumentStrategy = new
SequenceInputTypeStrategy(
+
Collections.singletonList(argumentTypeStrategy), null);
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
currentPos + 1, singleArgumentStrategy));
+ currentPos += 1;
+ return this;
+ }
+
+ /**
+ * Defines a common {@link InputTypeStrategy} for the next
arguments. Given
+ * input strategy must expect a constant number of arguments.
That means that both
+ * the minimum and maximum number of arguments must be defined
and equal to each other.
+ *
+ * <p>If you need a varying logic use {@link
#finishWithvarying(InputTypeStrategy)}.
+ */
+ public SubSequenceStrategyBuilder subSequence(InputTypeStrategy
inputTypeStrategy) {
+ Optional<Integer> maxCount =
inputTypeStrategy.getArgumentCount().getMaxCount();
+ Optional<Integer> minCount =
inputTypeStrategy.getArgumentCount().getMinCount();
+ if (!maxCount.isPresent() || !minCount.isPresent() ||
!maxCount.get().equals(minCount.get())) {
+ throw new IllegalArgumentException("Both the
minimum and maximum number of expected arguments must" +
+ " be defined and equal to each other.");
+ }
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
currentPos + maxCount.get(), inputTypeStrategy));
+ currentPos += maxCount.get();
+ return this;
+ }
+
+ /**
+ * Defines a common {@link InputTypeStrategy} for the next
arguments. Given
+ * input strategy must expect a varying number of arguments.
That means that the
+ * maximum number of arguments must not be defined.
+ */
+ public InputTypeStrategy finishWithvarying(InputTypeStrategy
inputTypeStrategy) {
+ ArgumentCount strategyArgumentCount =
inputTypeStrategy.getArgumentCount();
+ Optional<Integer> maxCount =
strategyArgumentCount.getMaxCount();
+ if (maxCount.isPresent()) {
+ throw new IllegalArgumentException("The maximum
number of arguments must not be defined.");
+ }
+ argumentsSplits.add(new ArgumentsSplit(currentPos,
null, inputTypeStrategy));
+ int minCount = currentPos;
+ Optional<Integer> strategyMinCount =
strategyArgumentCount.getMinCount();
+ if (strategyMinCount.isPresent()) {
Review comment:
same comment as bevor, why don't we use the fluent `.orElse` instead of
`.isPresent()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]