This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 940c3a345b0 [FLINK-26782] Remove PlannerExpression and related
940c3a345b0 is described below
commit 940c3a345b0a8ce10f25876068191a5e4a015434
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Nov 17 17:52:53 2023 +0100
[FLINK-26782] Remove PlannerExpression and related
---
.../flink/table/catalog/FunctionCatalog.java | 30 +-
.../apache/flink/table/catalog/FunctionLookup.java | 4 -
.../table/delegation/PlannerTypeInferenceUtil.java | 40 --
.../resolver/rules/ResolveCallByArgumentsRule.java | 28 +-
.../flink/table/utils/FunctionLookupMock.java | 40 --
.../functions/AggregateFunctionDefinition.java | 12 +-
.../LegacyUserDefinedFunctionInference.java | 529 +++++++++++++++++++++
.../table/functions/ScalarFunctionDefinition.java | 9 +-
.../TableAggregateFunctionDefinition.java | 12 +-
.../table/functions/TableFunctionDefinition.java | 11 +-
.../expressions/PlannerTypeInferenceUtilImpl.java | 153 ------
.../table/planner/delegation/PlannerBase.scala | 4 -
.../planner/expressions/ExpressionBridge.scala | 29 --
.../table/planner/expressions/InputTypeSpec.scala | 67 ---
.../planner/expressions/PlannerExpression.scala | 85 ----
.../expressions/PlannerExpressionConverter.scala | 221 ---------
.../expressions/PlannerExpressionUtils.scala | 67 ---
.../flink/table/planner/expressions/TreeNode.scala | 110 -----
.../table/planner/expressions/aggregations.scala | 83 ----
.../flink/table/planner/expressions/call.scala | 138 ------
.../planner/expressions/fieldExpression.scala | 125 -----
.../flink/table/planner/expressions/literals.scala | 83 ----
.../flink/table/planner/expressions/symbols.scala | 121 -----
23 files changed, 569 insertions(+), 1432 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 06e275c2742..83074cd020b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -44,7 +43,6 @@ import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceUri;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -81,12 +79,6 @@ public final class FunctionCatalog {
private final Map<String, CatalogFunction> tempSystemFunctions;
private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions;
- /**
- * Temporary utility until the new type inference is fully functional. It
needs to be set by the
- * planner.
- */
- private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
-
public FunctionCatalog(
ReadableConfig config,
ResourceManager resourceManager,
@@ -98,8 +90,7 @@ public final class FunctionCatalog {
catalogManager,
moduleManager,
new LinkedHashMap<>(),
- new LinkedHashMap<>(),
- null);
+ new LinkedHashMap<>());
}
private FunctionCatalog(
@@ -108,19 +99,13 @@ public final class FunctionCatalog {
CatalogManager catalogManager,
ModuleManager moduleManager,
Map<String, CatalogFunction> tempSystemFunctions,
- Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions,
- PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
+ Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions) {
this.config = checkNotNull(config);
this.resourceManager = checkNotNull(resourceManager);
this.catalogManager = checkNotNull(catalogManager);
this.moduleManager = checkNotNull(moduleManager);
this.tempSystemFunctions = tempSystemFunctions;
this.tempCatalogFunctions = tempCatalogFunctions;
- this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
- }
-
- public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil
plannerTypeInferenceUtil) {
- this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
}
/** Registers a temporary system function. */
@@ -431,14 +416,6 @@ public final class FunctionCatalog {
UnresolvedIdentifier identifier) {
return FunctionCatalog.this.lookupFunction(identifier);
}
-
- @Override
- public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
- Preconditions.checkNotNull(
- plannerTypeInferenceUtil,
- "A planner should have set the type inference
utility.");
- return plannerTypeInferenceUtil;
- }
};
}
@@ -807,8 +784,7 @@ public final class FunctionCatalog {
catalogManager,
moduleManager,
tempSystemFunctions,
- tempCatalogFunctions,
- plannerTypeInferenceUtil);
+ tempCatalogFunctions);
}
private void registerCatalogFunction(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
index c5b985a48ae..6e20409f8d3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import java.util.Optional;
@@ -51,7 +50,4 @@ public interface FunctionLookup {
"Required built-in function
[%s] could not be found in any catalog.",
definition.getName())));
}
-
- /** Temporary utility until the new type inference is fully functional. */
- PlannerTypeInferenceUtil getPlannerTypeInferenceUtil();
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
deleted file mode 100644
index 28c1f67981f..00000000000
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.delegation;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-
-import java.util.List;
-
-/**
- * Temporary utility for validation and output type inference until all {@code
PlannerExpression}
- * are upgraded to work with {@link TypeInferenceUtil}.
- */
-@Internal
-public interface PlannerTypeInferenceUtil {
-
- /** Same behavior as {@link
TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}. */
- TypeInferenceUtil.Result runTypeInference(
- UnresolvedCallExpression unresolvedCall, List<ResolvedExpression>
resolvedArgs);
-}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index eb182cc954b..468f85942d3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -161,7 +160,11 @@ final class ResolveCallByArgumentsRule implements
ResolverRule {
newInference,
resolvedArgs,
surroundingInfo))
- .orElseGet(() ->
runLegacyTypeInference(unresolvedCall, resolvedArgs)));
+ .orElseThrow(
+ () ->
+ new TableException(
+ "Could not get a type
inference for function: "
+ + name)));
}
@Override
@@ -235,13 +238,6 @@ final class ResolveCallByArgumentsRule implements
ResolverRule {
/** Temporary method until all calls define a type inference. */
private Optional<TypeInference>
getOptionalTypeInference(FunctionDefinition definition) {
- if (definition instanceof ScalarFunctionDefinition
- || definition instanceof TableFunctionDefinition
- || definition instanceof AggregateFunctionDefinition
- || definition instanceof TableAggregateFunctionDefinition)
{
- return Optional.empty();
- }
-
final TypeInference inference =
definition.getTypeInference(resolutionContext.typeFactory());
if (inference.getOutputTypeStrategy() != TypeStrategies.MISSING) {
@@ -275,20 +271,6 @@ final class ResolveCallByArgumentsRule implements
ResolverRule {
return unresolvedCall.resolve(adaptedArguments,
inferenceResult.getOutputDataType());
}
- private ResolvedExpression runLegacyTypeInference(
- UnresolvedCallExpression unresolvedCall,
List<ResolvedExpression> resolvedArgs) {
-
- final PlannerTypeInferenceUtil util =
-
resolutionContext.functionLookup().getPlannerTypeInferenceUtil();
-
- final Result inferenceResult =
util.runTypeInference(unresolvedCall, resolvedArgs);
-
- final List<ResolvedExpression> adaptedArguments =
- adaptArguments(inferenceResult, resolvedArgs);
-
- return unresolvedCall.resolve(adaptedArguments,
inferenceResult.getOutputDataType());
- }
-
/** Adapts the arguments according to the properties of the {@link
Result}. */
private List<ResolvedExpression> adaptArguments(
Result inferenceResult, List<ResolvedExpression> resolvedArgs)
{
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
index d12ec985d2c..65bdcd201d1 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
@@ -18,26 +18,16 @@
package org.apache.flink.table.utils;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-import org.apache.flink.table.types.utils.TypeConversions;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* A test implementation for a {@link FunctionLookup}. It mocks away a few
features of a {@link
@@ -106,34 +96,4 @@ public final class FunctionLookupMock implements
FunctionLookup {
return ContextResolvedFunction.permanent(
FunctionIdentifier.of(definition.getName()), definition);
}
-
- @Override
- public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
- return (unresolvedCall, resolvedArgs) -> {
- FunctionDefinition functionDefinition =
unresolvedCall.getFunctionDefinition();
- List<DataType> argumentTypes =
- resolvedArgs.stream()
- .map(ResolvedExpression::getOutputDataType)
- .collect(Collectors.toList());
- if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) {
- return new TypeInferenceUtil.Result(argumentTypes, null,
DataTypes.BOOLEAN());
- } else if
(functionDefinition.equals(BuiltInFunctionDefinitions.IS_NULL)) {
- return new TypeInferenceUtil.Result(argumentTypes, null,
DataTypes.BOOLEAN());
- } else if (functionDefinition instanceof ScalarFunctionDefinition)
{
- return new TypeInferenceUtil.Result(
- argumentTypes,
- null,
- // We do not support a full legacy type inference
here. We support only a
- // static result
- // type
- TypeConversions.fromLegacyInfoToDataType(
- ((ScalarFunctionDefinition) functionDefinition)
- .getScalarFunction()
- .getResultType(null)));
- }
-
- throw new IllegalArgumentException(
- "Unsupported builtin function in the test: " +
unresolvedCall);
- };
- }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
index aac4119f03b..e596743eec0 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
@@ -19,14 +19,16 @@
package org.apache.flink.table.functions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import java.util.Set;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
/**
* A "marker" function definition of an user-defined aggregate function that
uses the old type
* system stack.
@@ -77,8 +79,12 @@ public final class AggregateFunctionDefinition implements
FunctionDefinition {
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
- throw new TableException(
- "Functions implemented for the old type system are not
supported.");
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(
+
LegacyUserDefinedFunctionInference.getInputTypeStrategy(aggregateFunction))
+ .outputTypeStrategy(
+
TypeStrategies.explicit(fromLegacyInfoToDataType(resultTypeInfo)))
+ .build();
}
@Override
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
new file mode 100644
index 00000000000..9114e532bb7
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava31.com.google.common.primitives.Primitives;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * Ported {@code UserDefinedFunctionUtils} to run some of the type inference
for legacy functions in
+ * Table API.
+ */
+@Internal
+@Deprecated
+public class LegacyUserDefinedFunctionInference {
+
+ public static InputTypeStrategy
getInputTypeStrategy(ImperativeAggregateFunction<?, ?> func) {
+ return new InputTypeStrategy() {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.any();
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ final DataType accType = getAccumulatorType(func);
+ final LogicalType[] input =
+ Stream.concat(Stream.of(accType),
argumentDataTypes.stream())
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+
+ final Optional<Optional<Method>> foundMethod =
+ Stream.of(
+ logicalTypesToInternalClasses(input),
+ logicalTypesToExternalClasses(input))
+ .map(
+ signature ->
+ getUserDefinedMethod(
+ func,
+ "accumulate",
+ signature,
+ input,
+ cls ->
+ Stream.concat(
+
Stream.of(accType),
+
Arrays.stream(cls)
+
.skip(1)
+
.map(
+
c ->
+
fromLegacyInfoToDataType(
+
TypeExtractor
+
.createTypeInfo(
+
c))))
+
.toArray(DataType[]::new)))
+ .filter(Optional::isPresent)
+ .findAny();
+
+ if (foundMethod.isPresent()) {
+ return Optional.of(argumentDataTypes);
+ } else {
+ return callContext.fail(throwOnFailure, "");
+ }
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return getSignatures(func, "accumulate");
+ }
+ };
+ }
+
+ public static InputTypeStrategy getInputTypeStrategy(TableFunction<?>
func) {
+ return new InputTypeStrategy() {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.any();
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ final LogicalType[] input =
+ argumentDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+
+ final Optional<Method> foundMethod =
+ getUserDefinedMethod(
+ func,
+ "eval",
+ argumentDataTypes.stream()
+ .map(DataType::getConversionClass)
+ .toArray(Class<?>[]::new),
+ input,
+ cls ->
+ Arrays.stream(cls)
+ .map(
+ c ->
+
fromLegacyInfoToDataType(
+
TypeExtractor
+
.createTypeInfo(c)))
+ .toArray(DataType[]::new));
+
+ if (foundMethod.isPresent()) {
+ return Optional.of(argumentDataTypes);
+ } else {
+ return callContext.fail(
+ throwOnFailure, "Given parameters do not match any
signature.");
+ }
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return getSignatures(func, "eval");
+ }
+ };
+ }
+
+ public static TypeStrategy getOutputTypeStrategy(ScalarFunction func) {
+ return callContext -> {
+ final LogicalType[] params =
+ callContext.getArgumentDataTypes().stream()
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+ Optional<Class<?>[]> evalParams = getEvalMethodSignature(func,
params);
+ if (!evalParams.isPresent()) {
+ return Optional.empty();
+ }
+
+ final TypeInformation<?> userDefinedTypeInfo =
func.getResultType(evalParams.get());
+ if (userDefinedTypeInfo != null) {
+ return
Optional.of(fromLegacyInfoToDataType(userDefinedTypeInfo));
+ } else {
+ final Optional<Method> eval =
+ getUserDefinedMethod(
+ func,
+ "eval",
+ logicalTypesToExternalClasses(params),
+ params,
+ (paraClasses) ->
+
Arrays.stream(func.getParameterTypes(paraClasses))
+
.map(TypeConversions::fromLegacyInfoToDataType)
+ .toArray(DataType[]::new));
+ return eval.flatMap(m ->
TypeConversions.fromClassToDataType(m.getReturnType()));
+ }
+ };
+ }
+
+ public static InputTypeStrategy getInputTypeStrategy(ScalarFunction func) {
+ return new InputTypeStrategy() {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.any();
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ final LogicalType[] input =
+ argumentDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+
+ final Optional<Method> foundMethod =
+ getUserDefinedMethod(
+ func,
+ "eval",
+ logicalTypesToExternalClasses(input),
+ input,
+ (paraClasses) ->
+
Arrays.stream(func.getParameterTypes(paraClasses))
+
.map(TypeConversions::fromLegacyInfoToDataType)
+ .toArray(DataType[]::new));
+
+ if (foundMethod.isPresent()) {
+ return Optional.of(argumentDataTypes);
+ } else {
+ return callContext.fail(
+ throwOnFailure, "Given parameters do not match any
signature.");
+ }
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return getSignatures(func, "eval");
+ }
+ };
+ }
+
+ private static Optional<Class<?>[]> getEvalMethodSignature(
+ ScalarFunction func, LogicalType[] expectedTypes) {
+ return getUserDefinedMethod(
+ func,
+ "eval",
+ logicalTypesToExternalClasses(expectedTypes),
+ expectedTypes,
+ (paraClasses) ->
+
Arrays.stream(func.getParameterTypes(paraClasses))
+
.map(TypeConversions::fromLegacyInfoToDataType)
+ .toArray(DataType[]::new))
+ .map(
+ m ->
+ getParamClassesConsiderVarArgs(
+ m.isVarArgs(),
+ m.getParameterTypes(),
+ expectedTypes.length));
+ }
+
+ private static Class<?>[] getParamClassesConsiderVarArgs(
+ boolean isVarArgs, Class<?>[] matchingSignature, int
expectedLength) {
+ return IntStream.range(0, expectedLength)
+ .mapToObj(
+ i -> {
+ if (i < matchingSignature.length - 1) {
+ return matchingSignature[i];
+ } else if (isVarArgs) {
+ return
matchingSignature[matchingSignature.length - 1]
+ .getComponentType();
+ } else {
+ // last argument is not an array type
+ return
matchingSignature[matchingSignature.length - 1];
+ }
+ })
+ .toArray(Class<?>[]::new);
+ }
+
+ private static DataType getAccumulatorType(ImperativeAggregateFunction<?,
?> func) {
+ final TypeInformation<?> accType = func.getAccumulatorType();
+ if (accType != null) {
+ return fromLegacyInfoToDataType(accType);
+ } else {
+ try {
+ return fromLegacyInfoToDataType(
+ TypeExtractor.createTypeInfo(
+ func, ImperativeAggregateFunction.class,
func.getClass(), 1));
+ } catch (InvalidTypesException ite) {
+ throw new TableException(
+ String.format(
+ "Cannot infer generic type of %s}. You can
override"
+ + "
ImperativeAggregateFunction.getAccumulatorType()"
+ + " to specify the type.",
+ func.getClass()),
+ ite);
+ }
+ }
+ }
+
+ private static Class<?>[] logicalTypesToExternalClasses(LogicalType[]
types) {
+ return Arrays.stream(types)
+ .map(
+ t -> {
+ if (t == null) {
+ return null;
+ } else {
+ return TypeConversions.fromLogicalToDataType(t)
+ .getConversionClass();
+ }
+ })
+ .toArray(Class<?>[]::new);
+ }
+
+ private static Class<?>[] logicalTypesToInternalClasses(LogicalType[]
types) {
+ return Arrays.stream(types)
+ .map(
+ t -> {
+ if (t == null) {
+ return null;
+ } else {
+ return toInternalConversionClass(t);
+ }
+ })
+ .toArray(Class<?>[]::new);
+ }
+
+ private static boolean parameterClassEquals(Class<?> candidate, Class<?>
expected) {
+ return candidate == null
+ || candidate == expected
+ || expected == Object.class
+ || candidate == Object.class
+ || // Special case when we don't know the type
+ expected.isPrimitive() && Primitives.wrap(expected) ==
candidate
+ || candidate == Date.class && (expected == Integer.class ||
expected == int.class)
+ || candidate == Time.class && (expected == Integer.class ||
expected == int.class)
+ || candidate == StringData.class && expected == String.class
+ || candidate == String.class && expected == StringData.class
+ || candidate == TimestampData.class && expected ==
LocalDateTime.class
+ || candidate == Timestamp.class && expected ==
TimestampData.class
+ || candidate == TimestampData.class && expected ==
Timestamp.class
+ || candidate == LocalDateTime.class && expected ==
TimestampData.class
+ || candidate == TimestampData.class && expected ==
Instant.class
+ || candidate == Instant.class && expected ==
TimestampData.class
+ || RowData.class.isAssignableFrom(candidate) && expected ==
Row.class
+ || candidate == Row.class &&
RowData.class.isAssignableFrom(expected)
+ || RowData.class.isAssignableFrom(candidate) && expected ==
RowData.class
+ || candidate == RowData.class &&
RowData.class.isAssignableFrom(expected)
+ || candidate == DecimalData.class && expected ==
BigDecimal.class
+ || candidate == BigDecimal.class && expected ==
DecimalData.class
+ || (candidate.isArray()
+ && expected.isArray()
+ && candidate.getComponentType() != null
+ && expected.getComponentType() == Object.class);
+ }
+
+ private static boolean parameterDataTypeEquals(LogicalType internal,
DataType parameterType) {
+ if (internal.is(LogicalTypeRoot.RAW)
+ && parameterType.getLogicalType().is(LogicalTypeRoot.RAW)) {
+ return
TypeConversions.fromLogicalToDataType(internal).getConversionClass()
+ == parameterType.getConversionClass();
+ } else {
+ // There is a special equal to GenericType. We need rewrite type
extract to RowData
+ // etc...
+ return parameterType.getLogicalType() == internal
+ || toInternalConversionClass(internal)
+ ==
toInternalConversionClass(parameterType.getLogicalType());
+ }
+ }
+
+ private static Optional<Method> getUserDefinedMethod(
+ UserDefinedFunction function,
+ String methodName,
+ Class<?>[] methodSignature,
+ LogicalType[] internalTypes,
+ Function<Class<?>[], DataType[]> parameterTypesFunc) {
+ final List<Method> methods = checkAndExtractMethods(function,
methodName);
+
+ final List<Method> filteredMethods =
+ methods.stream()
+ .filter(
+ cur -> {
+ final Class<?>[] parameterTypes =
cur.getParameterTypes();
+ final DataType[] parameterDataTypes =
+
parameterTypesFunc.apply(parameterTypes);
+ if (cur.isVarArgs()) {
+ return varArgsMethodMatch(
+ methodSignature,
+ internalTypes,
+ parameterTypes,
+ parameterDataTypes);
+
+ } else {
+ return methodMatch(
+ methodSignature,
+ internalTypes,
+ parameterTypes,
+ parameterDataTypes);
+ }
+ })
+ .collect(Collectors.toList());
+
+ // if there is a fixed method, compiler will call this method
preferentially
+
+ final List<Method> found =
+ filteredMethods.stream()
+ .sorted(Comparator.comparing(Method::isVarArgs,
Boolean::compareTo))
+ .filter(cur ->
!Modifier.isVolatile(cur.getModifiers()))
+ .collect(Collectors.toList());
+
+ final int foundCount = found.size();
+ if (foundCount == 1) {
+ return Optional.of(found.get(0));
+ } else if (foundCount > 1) {
+ if (Arrays.asList(methodSignature).contains(Object.class)) {
+ return Optional.of(found.get(0));
+ }
+
+ final List<Method> nonObjectParameterMethods =
+ found.stream()
+ .filter(
+ m ->
+
!Arrays.asList(m.getParameterTypes())
+ .contains(Object.class))
+ .collect(Collectors.toList());
+
+ if (nonObjectParameterMethods.size() == 1) {
+ return Optional.of(nonObjectParameterMethods.get(0));
+ }
+
+ throw new ValidationException(
+ String.format(
+ "Found multiple '%s' methods which match the
signature.", methodName));
+ }
+
+ return Optional.empty();
+ }
+
+ private static boolean methodMatch(
+ Class<?>[] methodSignature,
+ LogicalType[] internalTypes,
+ Class<?>[] parameterTypes,
+ DataType[] parameterDataTypes) {
+ // match parameters of signature to actual parameters
+ return methodSignature.length == parameterTypes.length
+ && IntStream.range(0, parameterTypes.length)
+ .allMatch(
+ i ->
+
parameterClassEquals(methodSignature[i], parameterTypes[i])
+ || parameterDataTypeEquals(
+ internalTypes[i],
parameterDataTypes[i]));
+ }
+
+ private static boolean varArgsMethodMatch(
+ Class<?>[] methodSignature,
+ LogicalType[] internalTypes,
+ Class<?>[] parameterTypes,
+ DataType[] parameterDataTypes) {
+ if (methodSignature.length == 0 && parameterTypes.length == 1) {
+ return true;
+ } else {
+ return IntStream.range(0, methodSignature.length)
+ .allMatch(
+ i -> {
+ if (i < parameterTypes.length - 1) {
+ return parameterClassEquals(
+ methodSignature[i],
parameterTypes[i])
+ || parameterDataTypeEquals(
+ internalTypes[i],
parameterDataTypes[i]);
+ } else {
+ return parameterClassEquals(
+ methodSignature[i],
+
parameterTypes[parameterTypes.length - 1]
+
.getComponentType())
+ || parameterDataTypeEquals(
+ internalTypes[i],
parameterDataTypes[i]);
+ }
+ });
+ }
+ }
+
+ private static List<Method> checkAndExtractMethods(
+ UserDefinedFunction function, String methodName) {
+ final List<Method> methods =
+ Arrays.stream(function.getClass().getMethods())
+ .filter(
+ m -> {
+ final int modifiers = m.getModifiers();
+ return m.getName().equals(methodName)
+ && Modifier.isPublic(modifiers)
+ && !Modifier.isAbstract(modifiers)
+ && !(function instanceof
TableFunction
+ &&
Modifier.isStatic(modifiers));
+ })
+ .collect(Collectors.toList());
+
+ if (methods.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Function class '%s' does not implement at least "
+ + "one method named '%s' which is public,
not abstract and "
+ + "(in case of table functions) not
static.",
+ function.getClass().getCanonicalName(),
methodName));
+ }
+
+ return methods;
+ }
+
+ private static List<Signature> getSignatures(UserDefinedFunction func,
String methodName) {
+ return checkAndExtractMethods(func, methodName).stream()
+ .map(Method::getParameterTypes)
+ .map(
+ sig ->
+ Signature.of(
+ Arrays.stream(sig)
+ .map(s ->
Signature.Argument.of(s.getSimpleName()))
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
+ }
+
+ private LegacyUserDefinedFunctionInference() {}
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
index 5ae2752e164..fe3c8f9c3cc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.functions;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.util.Preconditions;
@@ -62,8 +61,12 @@ public final class ScalarFunctionDefinition implements
FunctionDefinition {
@Override
public TypeInference getTypeInference(DataTypeFactory factory) {
- throw new TableException(
- "Functions implemented for the old type system are not
supported.");
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(
+
LegacyUserDefinedFunctionInference.getInputTypeStrategy(scalarFunction))
+ .outputTypeStrategy(
+
LegacyUserDefinedFunctionInference.getOutputTypeStrategy(scalarFunction))
+ .build();
}
@Override
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
index e8633863764..7a5015816c0 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
@@ -19,14 +19,16 @@
package org.apache.flink.table.functions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import java.util.Set;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
/**
* A "marker" function definition of an user-defined table aggregate function
that uses the old type
* system stack.
@@ -77,8 +79,12 @@ public final class TableAggregateFunctionDefinition
implements FunctionDefinitio
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
- throw new TableException(
- "Functions implemented for the old type system are not
supported.");
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(
+
LegacyUserDefinedFunctionInference.getInputTypeStrategy(aggregateFunction))
+ .outputTypeStrategy(
+
TypeStrategies.explicit(fromLegacyInfoToDataType(resultTypeInfo)))
+ .build();
}
@Override
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
index bc540be441c..b9ccd161770 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
@@ -19,14 +19,16 @@
package org.apache.flink.table.functions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import java.util.Set;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
/**
* A "marker" function definition of an user-defined table function that uses
the old type system
* stack.
@@ -68,8 +70,11 @@ public final class TableFunctionDefinition implements
FunctionDefinition {
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
- throw new TableException(
- "Functions implemented for the old type system are not
supported.");
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(
+
LegacyUserDefinedFunctionInference.getInputTypeStrategy(tableFunction))
+
.outputTypeStrategy(TypeStrategies.explicit(fromLegacyInfoToDataType(resultType)))
+ .build();
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
deleted file mode 100644
index 0882dc77d8c..00000000000
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.planner.typeutils.TypeCoercion;
-import org.apache.flink.table.planner.validate.ValidationFailure;
-import org.apache.flink.table.planner.validate.ValidationResult;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static
org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toJava;
-import static
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType;
-import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/** Implementation of {@link PlannerTypeInferenceUtil}. */
-@Internal
-public final class PlannerTypeInferenceUtilImpl implements
PlannerTypeInferenceUtil {
-
- public static final PlannerTypeInferenceUtil INSTANCE = new
PlannerTypeInferenceUtilImpl();
-
- private static final PlannerExpressionConverter CONVERTER =
- PlannerExpressionConverter.INSTANCE();
-
- @Override
- public TypeInferenceUtil.Result runTypeInference(
- UnresolvedCallExpression unresolvedCall, List<ResolvedExpression>
resolvedArgs) {
- // We should not try to resolve the children again with the old type
stack
- // The arguments might have been resolved with the new stack already.
In that case the
- // resolution will fail.
- unresolvedCall = unresolvedCall.replaceArgs(new
ArrayList<>(resolvedArgs));
- final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER);
-
- if (plannerCall instanceof InputTypeSpec) {
- return resolveWithCastedAssignment(
- unresolvedCall,
- resolvedArgs,
- toJava(((InputTypeSpec) plannerCall).expectedTypes()),
- plannerCall.resultType());
- } else {
- validateArguments(plannerCall);
-
- final List<DataType> expectedArgumentTypes =
- resolvedArgs.stream()
- .map(ResolvedExpression::getOutputDataType)
- .collect(Collectors.toList());
-
- return new TypeInferenceUtil.Result(
- expectedArgumentTypes,
- null,
- fromLegacyInfoToDataType(plannerCall.resultType()));
- }
- }
-
- private TypeInferenceUtil.Result resolveWithCastedAssignment(
- UnresolvedCallExpression unresolvedCall,
- List<ResolvedExpression> args,
- List<TypeInformation<?>> expectedTypes,
- TypeInformation<?> resultType) {
-
- final List<PlannerExpression> plannerArgs =
- unresolvedCall.getChildren().stream()
- .map(e -> e.accept(CONVERTER))
- .collect(Collectors.toList());
-
- final List<DataType> castedArgs =
- IntStream.range(0, plannerArgs.size())
- .mapToObj(
- idx ->
- castIfNeeded(
- args.get(idx),
- plannerArgs.get(idx),
- expectedTypes.get(idx)))
- .collect(Collectors.toList());
-
- return new TypeInferenceUtil.Result(castedArgs, null,
fromLegacyInfoToDataType(resultType));
- }
-
- private void validateArguments(PlannerExpression plannerCall) {
- if (!plannerCall.valid()) {
- throw new ValidationException(
- getValidationErrorMessage(plannerCall)
- .orElse(
- "Unexpected behavior, validation failed
but can't get error messages!"));
- }
- }
-
- /**
- * Return the validation error message of this {@link PlannerExpression}
or return the
- * validation error message of it's children if it passes the validation.
Return empty if all
- * validation succeeded.
- */
- private Optional<String> getValidationErrorMessage(PlannerExpression
plannerCall) {
- ValidationResult validationResult = plannerCall.validateInput();
- if (validationResult instanceof ValidationFailure) {
- return Optional.of(((ValidationFailure)
validationResult).message());
- } else {
- for (Expression plannerExpression : plannerCall.getChildren()) {
- Optional<String> errorMessage =
- getValidationErrorMessage((PlannerExpression)
plannerExpression);
- if (errorMessage.isPresent()) {
- return errorMessage;
- }
- }
- }
- return Optional.empty();
- }
-
- private DataType castIfNeeded(
- ResolvedExpression child,
- PlannerExpression plannerChild,
- TypeInformation<?> expectedType) {
- TypeInformation<?> actualType = plannerChild.resultType();
- if (actualType.equals(expectedType)) {
- return child.getOutputDataType();
- } else if (TypeCoercion.canSafelyCast(
- fromTypeInfoToLogicalType(actualType),
fromTypeInfoToLogicalType(expectedType))) {
- return fromLegacyInfoToDataType(expectedType);
- } else {
- throw new ValidationException(
- String.format(
- "Incompatible type of argument: %s Expected: %s",
child, expectedType));
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 48fe89d5303..59e44941a8e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -36,7 +36,6 @@ import org.apache.flink.table.planner.calcite._
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.connectors.DynamicSinkUtils
import
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
-import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
@@ -96,9 +95,6 @@ abstract class PlannerBase(
classLoader: ClassLoader)
extends Planner {
- // temporary utility until we don't use planner expressions anymore
-
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
-
private var parserFactory: ParserFactory = _
private var parser: Parser = _
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
deleted file mode 100644
index f226cff31a6..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor}
-
-/** Bridges between API [[Expression]]s (for both Java and Scala) and final
expression stack. */
-class ExpressionBridge[E <: Expression](finalVisitor: ExpressionVisitor[E]) {
-
- def bridge(expression: Expression): E = {
- // convert to final expressions
- expression.accept(finalVisitor)
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
deleted file mode 100644
index 42b1c9da118..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.planner.validate._
-
-import scala.collection.mutable
-
-/** Expressions that have strict data type specification on its inputs. */
-trait InputTypeSpec extends PlannerExpression {
-
- /**
- * Input type specification for each child.
- *
- * For example, [[Power]] expecting both of the children be of double type
should use:
- * {{{
- * def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO ::
DOUBLE_TYPE_INFO :: Nil
- * }}}
- *
- * Inputs that don't match the expected type will be safely casted to a
higher type. Therefore,
- * use the decimal type with caution as all numeric types would be casted to
a very inefficient
- * type.
- */
- private[flink] def expectedTypes: Seq[TypeInformation[_]]
-
- override private[flink] def validateInput(): ValidationResult = {
- val typeMismatches = mutable.ArrayBuffer.empty[String]
-
- if (expectedTypes.size != children.size) {
- return ValidationFailure(
- s"""|$this fails on input type size checking: expected types
size[${expectedTypes.size}].
- |Operands types size[${children.size}].
- |""".stripMargin)
- }
-
- children.zip(expectedTypes).zipWithIndex.foreach {
- case ((e, tpe), i) =>
- if (e.resultType != tpe) {
- typeMismatches += s"expecting $tpe on ${i}th input, get
${e.resultType}"
- }
- }
- if (typeMismatches.isEmpty) {
- ValidationSuccess
- } else {
- ValidationFailure(
- s"""|$this fails on input type checking:
${typeMismatches.mkString("[", ", ", "]")}.
- |Operand should be casted to proper type
- |""".stripMargin)
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
deleted file mode 100644
index 89054e53f46..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor}
-import org.apache.flink.table.planner.validate.{ValidationResult,
ValidationSuccess}
-
-import _root_.scala.collection.JavaConversions._
-
-import java.util
-
-abstract class PlannerExpression extends TreeNode[PlannerExpression] with
Expression {
-
- /**
- * Returns the [[TypeInformation]] for evaluating this expression. It is
sometimes not available
- * until the expression is valid.
- */
- private[flink] def resultType: TypeInformation[_]
-
- /** One pass validation of the expression tree in post order. */
- private[flink] lazy val valid: Boolean = childrenValid &&
validateInput().isSuccess
-
- private[flink] def childrenValid: Boolean = children.forall(_.valid)
-
- /**
- * Check input data types, inputs number or other properties specified by
this expression. Return
- * `ValidationSuccess` if it pass the check, or `ValidationFailure` with
supplement message
- * explaining the error. Note: we should only call this method until
`childrenValid == true`
- */
- private[flink] def validateInput(): ValidationResult = ValidationSuccess
-
- private[flink] def checkEquals(other: PlannerExpression): Boolean = {
- if (this.getClass != other.getClass) {
- false
- } else {
- def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
- elements1.length == elements2.length &&
elements1.zip(elements2).forall {
- case (e1: PlannerExpression, e2: PlannerExpression) =>
e1.checkEquals(e2)
- case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
- case (i1, i2) => i1 == i2
- }
- }
- val elements1 = this.productIterator.toSeq
- val elements2 = other.productIterator.toSeq
- checkEquality(elements1, elements2)
- }
- }
-
- override def asSummaryString(): String = toString
-
- override def getChildren: util.List[Expression] = children
-
- override def accept[R](visitor: ExpressionVisitor[R]): R =
visitor.visit(this)
-}
-
-abstract class BinaryExpression extends PlannerExpression {
- private[flink] def left: PlannerExpression
- private[flink] def right: PlannerExpression
- private[flink] def children = Seq(left, right)
-}
-
-abstract class UnaryExpression extends PlannerExpression {
- private[flink] def child: PlannerExpression
- private[flink] def children = Seq(child)
-}
-
-abstract class LeafExpression extends PlannerExpression {
- private[flink] val children = Nil
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
deleted file mode 100644
index bf3c13698f8..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions._
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
-import
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL,
SYMBOL}
-import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasLength,
hasPrecision, hasScale}
-
-import _root_.scala.collection.JavaConverters._
-
-/** Visitor implementation for converting [[Expression]]s to
[[PlannerExpression]]s. */
-class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExpression] {
-
- override def visit(call: CallExpression): PlannerExpression = {
- val definition = call.getFunctionDefinition
- translateCall(
- definition,
- call.getChildren.asScala,
- () =>
- definition match {
- case ROW | ARRAY | MAP =>
ApiResolvedExpression(call.getOutputDataType)
- case _ =>
- if (
- definition.getKind == FunctionKind.AGGREGATE ||
- definition.getKind == FunctionKind.TABLE_AGGREGATE
- ) {
- ApiResolvedAggregateCallExpression(call)
- } else {
- ApiResolvedExpression(call.getOutputDataType)
- }
- }
- )
- }
-
- override def visit(unresolvedCall: UnresolvedCallExpression):
PlannerExpression = {
- val definition = unresolvedCall.getFunctionDefinition
- translateCall(
- definition,
- unresolvedCall.getChildren.asScala,
- () => throw new TableException(s"Unsupported function definition:
$definition"))
- }
-
- private def translateCall(
- func: FunctionDefinition,
- children: Seq[Expression],
- unknownFunctionHandler: () => PlannerExpression): PlannerExpression = {
-
- val args = children.map(_.accept(this))
-
- func match {
- case sfd: ScalarFunctionDefinition =>
- val call = PlannerScalarFunctionCall(sfd.getScalarFunction, args)
- // it configures underlying state
- call.validateInput()
- call
-
- case tfd: TableFunctionDefinition =>
- PlannerTableFunctionCall(tfd.toString, tfd.getTableFunction, args,
tfd.getResultType)
-
- case afd: AggregateFunctionDefinition =>
- AggFunctionCall(
- afd.getAggregateFunction,
- afd.getResultTypeInfo,
- afd.getAccumulatorTypeInfo,
- args)
-
- case tafd: TableAggregateFunctionDefinition =>
- AggFunctionCall(
- tafd.getTableAggregateFunction,
- tafd.getResultTypeInfo,
- tafd.getAccumulatorTypeInfo,
- args)
-
- case _: FunctionDefinition =>
- unknownFunctionHandler()
- }
- }
-
- override def visit(literal: ValueLiteralExpression): PlannerExpression = {
- if (literal.getOutputDataType.getLogicalType.is(SYMBOL)) {
- val plannerSymbol =
getSymbol(literal.getValueAs(classOf[TableSymbol]).get())
- return SymbolPlannerExpression(plannerSymbol)
- }
-
- val typeInfo = getLiteralTypeInfo(literal)
- if (literal.isNull) {
- Null(typeInfo)
- } else {
- Literal(literal.getValueAs(typeInfo.getTypeClass).get(), typeInfo)
- }
- }
-
- /** This method makes the planner more lenient for new data types defined
for literals. */
- private def getLiteralTypeInfo(literal: ValueLiteralExpression):
TypeInformation[_] = {
- val logicalType = literal.getOutputDataType.getLogicalType
-
- if (logicalType.is(DECIMAL)) {
- if (literal.isNull) {
- return Types.BIG_DEC
- }
- val value = literal.getValueAs(classOf[java.math.BigDecimal]).get()
- if (hasPrecision(logicalType, value.precision()) &&
hasScale(logicalType, value.scale())) {
- return Types.BIG_DEC
- }
- } else if (logicalType.is(CHAR)) {
- if (literal.isNull) {
- return Types.STRING
- }
- val value = literal.getValueAs(classOf[java.lang.String]).get()
- if (hasLength(logicalType, value.length)) {
- return Types.STRING
- }
- }
-
- fromDataTypeToTypeInfo(literal.getOutputDataType)
- }
-
- private def getSymbol(symbol: TableSymbol): PlannerSymbol = symbol match {
- case TimeIntervalUnit.MILLENNIUM => PlannerTimeIntervalUnit.MILLENNIUM
- case TimeIntervalUnit.CENTURY => PlannerTimeIntervalUnit.CENTURY
- case TimeIntervalUnit.DECADE => PlannerTimeIntervalUnit.DECADE
- case TimeIntervalUnit.YEAR => PlannerTimeIntervalUnit.YEAR
- case TimeIntervalUnit.YEAR_TO_MONTH =>
PlannerTimeIntervalUnit.YEAR_TO_MONTH
- case TimeIntervalUnit.QUARTER => PlannerTimeIntervalUnit.QUARTER
- case TimeIntervalUnit.MONTH => PlannerTimeIntervalUnit.MONTH
- case TimeIntervalUnit.WEEK => PlannerTimeIntervalUnit.WEEK
- case TimeIntervalUnit.DAY => PlannerTimeIntervalUnit.DAY
- case TimeIntervalUnit.DAY_TO_HOUR => PlannerTimeIntervalUnit.DAY_TO_HOUR
- case TimeIntervalUnit.DAY_TO_MINUTE =>
PlannerTimeIntervalUnit.DAY_TO_MINUTE
- case TimeIntervalUnit.DAY_TO_SECOND =>
PlannerTimeIntervalUnit.DAY_TO_SECOND
- case TimeIntervalUnit.HOUR => PlannerTimeIntervalUnit.HOUR
- case TimeIntervalUnit.SECOND => PlannerTimeIntervalUnit.SECOND
- case TimeIntervalUnit.HOUR_TO_MINUTE =>
PlannerTimeIntervalUnit.HOUR_TO_MINUTE
- case TimeIntervalUnit.HOUR_TO_SECOND =>
PlannerTimeIntervalUnit.HOUR_TO_SECOND
- case TimeIntervalUnit.MINUTE => PlannerTimeIntervalUnit.MINUTE
- case TimeIntervalUnit.MINUTE_TO_SECOND =>
PlannerTimeIntervalUnit.MINUTE_TO_SECOND
- case TimeIntervalUnit.MILLISECOND => PlannerTimeIntervalUnit.MILLISECOND
- case TimeIntervalUnit.MICROSECOND => PlannerTimeIntervalUnit.MICROSECOND
- case TimeIntervalUnit.NANOSECOND => PlannerTimeIntervalUnit.NANOSECOND
- case TimeIntervalUnit.EPOCH => PlannerTimeIntervalUnit.EPOCH
- case TimePointUnit.YEAR => PlannerTimePointUnit.YEAR
- case TimePointUnit.MONTH => PlannerTimePointUnit.MONTH
- case TimePointUnit.DAY => PlannerTimePointUnit.DAY
- case TimePointUnit.HOUR => PlannerTimePointUnit.HOUR
- case TimePointUnit.MINUTE => PlannerTimePointUnit.MINUTE
- case TimePointUnit.SECOND => PlannerTimePointUnit.SECOND
- case TimePointUnit.QUARTER => PlannerTimePointUnit.QUARTER
- case TimePointUnit.WEEK => PlannerTimePointUnit.WEEK
- case TimePointUnit.MILLISECOND => PlannerTimePointUnit.MILLISECOND
- case TimePointUnit.MICROSECOND => PlannerTimePointUnit.MICROSECOND
-
- case _ =>
- throw new TableException("Unsupported symbol: " + symbol)
- }
-
- override def visit(fieldReference: FieldReferenceExpression):
PlannerExpression = {
- PlannerResolvedFieldReference(
- fieldReference.getName,
- fromDataTypeToTypeInfo(fieldReference.getOutputDataType))
- }
-
- override def visit(fieldReference: UnresolvedReferenceExpression):
PlannerExpression = {
- UnresolvedFieldReference(fieldReference.getName)
- }
-
- override def visit(typeLiteral: TypeLiteralExpression): PlannerExpression = {
- ApiResolvedExpression(typeLiteral.getOutputDataType)
- }
-
- override def visit(tableRef: TableReferenceExpression): PlannerExpression = {
- TableReference(
- tableRef.asInstanceOf[TableReferenceExpression].getName,
- tableRef.asInstanceOf[TableReferenceExpression].getQueryOperation
- )
- }
-
- override def visit(local: LocalReferenceExpression): PlannerExpression =
- PlannerLocalReference(local.getName,
fromDataTypeToTypeInfo(local.getOutputDataType))
-
- override def visit(lookupCall: LookupCallExpression): PlannerExpression =
- throw new TableException("Unsupported function call: " + lookupCall)
-
- override def visit(sqlCall: SqlCallExpression): PlannerExpression =
- throw new TableException("Unsupported function call: " + sqlCall)
-
- override def visit(other: ResolvedExpression): PlannerExpression =
visitNonApiExpression(other)
-
- override def visitNonApiExpression(other: Expression): PlannerExpression = {
- other match {
- // already converted planner expressions will pass this visitor without
modification
- case plannerExpression: PlannerExpression => plannerExpression
- case expr: RexNodeExpression => RexPlannerExpression(expr.getRexNode)
- case _ =>
- throw new TableException("Unrecognized expression: " + other)
- }
- }
-}
-
-object PlannerExpressionConverter {
- val INSTANCE: PlannerExpressionConverter = new PlannerExpressionConverter
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
deleted file mode 100644
index 9bf173ddcf0..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-object PlannerExpressionUtils {
-
- private[flink] def isTimeIntervalLiteral(expr: PlannerExpression): Boolean =
expr match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
- case _ => false
- }
-
- private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean =
expr match {
- case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
- case _ => false
- }
-
- private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr
match {
- case r: PlannerResolvedFieldReference if
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
- true
- case _ => false
- }
-
- private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean =
expr match {
- case r: PlannerResolvedFieldReference
- if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
- true
- case _ => false
- }
-
- private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean =
expr match {
- case r: PlannerResolvedFieldReference
- if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
- true
- case _ => false
- }
-
- private[flink] def toTime(expr: PlannerExpression): FlinkTime = expr match {
- case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- FlinkTime.milliseconds(value)
- case _ => throw new IllegalArgumentException()
- }
-
- private[flink] def toLong(expr: PlannerExpression): Long = expr match {
- case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
- case _ => throw new IllegalArgumentException()
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
deleted file mode 100644
index 182198a45e0..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
-
-/** Generic base class for trees that can be transformed and traversed. */
-abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
-
- /**
- * List of child nodes that should be considered when doing transformations.
Other values in the
- * Product will not be transformed, only handed through.
- */
- private[flink] def children: Seq[A]
-
- /** Tests for equality by first testing for reference equality. */
- private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other)
|| this == other
-
- /** Do tree transformation in post order. */
- private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
- def childrenTransform(rule: PartialFunction[A, A]): A = {
- var changed = false
- val newArgs = productIterator.map {
- case arg: TreeNode[_] if children.contains(arg) =>
- val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
- if (!(newChild.fastEquals(arg))) {
- changed = true
- newChild
- } else {
- arg
- }
- case args: Traversable[_] =>
- args.map {
- case arg: TreeNode[_] if children.contains(arg) =>
- val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
- if (!(newChild.fastEquals(arg))) {
- changed = true
- newChild
- } else {
- arg
- }
- case other => other
- }
- case nonChild: AnyRef => nonChild
- case null => null
- }.toArray
- if (changed) makeCopy(newArgs) else this
- }
-
- val afterChildren = childrenTransform(rule)
- if (afterChildren.fastEquals(this)) {
- rule.applyOrElse(this, identity[A])
- } else {
- rule.applyOrElse(afterChildren, identity[A])
- }
- }
-
- /** Runs the given function first on the node and then recursively on all
its children. */
- private[flink] def preOrderVisit(f: A => Unit): Unit = {
- f(this)
- children.foreach(_.preOrderVisit(f))
- }
-
- /**
- * Creates a new copy of this expression with new children. This is used
during transformation if
- * children change.
- */
- private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
- val ctors = getClass.getConstructors.filter(_.getParameterCount > 0)
- if (ctors.isEmpty) {
- throw new RuntimeException(s"No valid constructor for
${getClass.getSimpleName}")
- }
-
- val defaultCtor = ctors
- .find {
- ctor =>
- if (ctor.getParameterCount != newArgs.length) {
- false
- } else if (newArgs.contains(null)) {
- false
- } else {
- val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
- TypeInfoCheckUtils.isAssignable(argsClasses,
ctor.getParameterTypes)
- }
- }
- .getOrElse(ctors.maxBy(_.getParameterCount))
-
- try {
- defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
- } catch {
- case e: Throwable =>
- throw new RuntimeException(s"Fail to copy tree node
${getClass.getName}.", e)
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
deleted file mode 100644
index d3c19b8c4a1..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.MultisetTypeInfo
-import org.apache.flink.table.expressions.CallExpression
-import org.apache.flink.table.functions.ImperativeAggregateFunction
-import org.apache.flink.table.planner.calcite.FlinkTypeSystem
-import
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
-import org.apache.flink.table.planner.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo,
fromTypeInfoToLogicalType}
-import org.apache.flink.table.types.utils.TypeConversions
-import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
-sealed abstract class Aggregation extends PlannerExpression {
-
- override def toString = s"Aggregate"
-
-}
-
-/**
- * Wrapper for call expressions resolved already in the API with the new type
inference stack.
- * Separate from [[ApiResolvedExpression]] because others' expressions
validation logic check for
- * the [[Aggregation]] trait.
- */
-case class ApiResolvedAggregateCallExpression(resolvedCall: CallExpression)
extends Aggregation {
-
- private[flink] val children = Nil
-
- override private[flink] def resultType: TypeInformation[_] = TypeConversions
- .fromDataTypeToLegacyInfo(resolvedCall.getOutputDataType)
-}
-
-/** Expression for calling a user-defined (table)aggregate function. */
-case class AggFunctionCall(
- aggregateFunction: ImperativeAggregateFunction[_, _],
- resultTypeInfo: TypeInformation[_],
- accTypeInfo: TypeInformation[_],
- args: Seq[PlannerExpression])
- extends Aggregation {
-
- override private[flink] def children: Seq[PlannerExpression] = args
-
- override def resultType: TypeInformation[_] = resultTypeInfo
-
- override def validateInput(): ValidationResult = {
- val signature = children.map(_.resultType)
- // look for a signature that matches the input types
- val foundSignature =
- getAccumulateMethodSignature(aggregateFunction,
signature.map(fromTypeInfoToLogicalType))
- if (foundSignature.isEmpty) {
- ValidationFailure(
- s"Given parameters do not match any signature. \n" +
- s"Actual:
${signatureToString(signature.map(fromLegacyInfoToDataType))} \n" +
- s"Expected: ${getMethodSignatures(aggregateFunction, "accumulate")
- .map(_.drop(1))
- .map(signatureToString)
- .sorted // make sure order to verify error messages in tests
- .mkString(", ")}")
- } else {
- ValidationSuccess
- }
- }
-
- override def toString: String =
s"${aggregateFunction.getClass.getSimpleName}($args)"
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
deleted file mode 100644
index a6401076759..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation,
Types}
-import org.apache.flink.table.expressions.CallExpression
-import org.apache.flink.table.functions._
-import
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.planner.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
-import
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.utils.TypeConversions
-import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-/**
- * Wrapper for expressions that have been resolved already in the API with the
new type inference
- * stack.
- */
-case class ApiResolvedExpression(resolvedDataType: DataType) extends
LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
- TypeConversions.fromDataTypeToLegacyInfo(resolvedDataType)
-}
-
-/**
- * Over call with unresolved alias for over window.
- *
- * @param agg
- * The aggregation of the over call.
- * @param alias
- * The alias of the referenced over window.
- */
-case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def validateInput() =
- ValidationFailure(s"Over window with alias $alias could not be resolved.")
-
- override private[flink] def resultType = agg.resultType
-
- override private[flink] def children = Seq()
-}
-
-/**
- * Expression for calling a user-defined scalar functions.
- *
- * @param scalarFunction
- * scalar function to be called (might be overloaded)
- * @param parameters
- * actual parameters that determine target evaluation method
- */
-case class PlannerScalarFunctionCall(
- scalarFunction: ScalarFunction,
- parameters: Seq[PlannerExpression])
- extends PlannerExpression {
-
- private var signature: Array[LogicalType] = _
-
- override private[flink] def children: Seq[PlannerExpression] = parameters
-
- override def toString =
- s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(",
")})"
-
- override private[flink] def resultType =
- fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(scalarFunction,
signature))
-
- override private[flink] def validateInput(): ValidationResult = {
- signature =
children.map(_.resultType).map(fromTypeInfoToLogicalType).toArray
- // look for a signature that matches the input types
- val foundSignature = getEvalMethodSignatureOption(scalarFunction,
signature)
- if (foundSignature.isEmpty) {
- ValidationFailure(
- s"Given parameters do not match any signature. \n" +
- s"Actual:
${signatureToString(signature.map(fromLogicalTypeToDataType))} \n" +
- s"Expected: ${signaturesToString(scalarFunction, "eval")}")
- } else {
- ValidationSuccess
- }
- }
-}
-
-/**
- * Expression for calling a user-defined table function with actual parameters.
- *
- * @param functionName
- * function name
- * @param tableFunction
- * user-defined table function
- * @param parameters
- * actual parameters of function
- * @param resultType
- * type information of returned table
- */
-case class PlannerTableFunctionCall(
- functionName: String,
- tableFunction: TableFunction[_],
- parameters: Seq[PlannerExpression],
- resultType: TypeInformation[_])
- extends PlannerExpression {
-
- override private[flink] def children: Seq[PlannerExpression] = parameters
-
- override def validateInput(): ValidationResult = {
- // look for a signature that matches the input types
- val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
- val foundMethod = getUserDefinedMethod(tableFunction, "eval", signature)
- if (foundMethod.isEmpty) {
- ValidationFailure(
- s"Given parameters of function '$functionName' do not match any
signature. \n" +
- s"Actual: ${signatureToString(signature)} \n" +
- s"Expected: ${signaturesToString(tableFunction, "eval")}")
- } else {
- ValidationSuccess
- }
- }
-
- override def toString =
- s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
deleted file mode 100644
index c8aa491a6ba..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api._
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory._
-import org.apache.flink.table.planner.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-
-import org.apache.calcite.rex.RexNode
-
-trait NamedExpression extends PlannerExpression {
- private[flink] def name: String
- private[flink] def toAttribute: Attribute
-}
-
-abstract class Attribute extends LeafExpression with NamedExpression {
- override private[flink] def toAttribute: Attribute = this
-
- private[flink] def withName(newName: String): Attribute
-}
-
-/** Dummy wrapper for expressions that were converted to RexNode in a
different way. */
-case class RexPlannerExpression(private[flink] val rexNode: RexNode) extends
LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
- fromLogicalTypeToTypeInfo(FlinkTypeFactory.toLogicalType(rexNode.getType))
-}
-
-case class UnresolvedFieldReference(name: String) extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def withName(newName: String): Attribute =
- UnresolvedFieldReference(newName)
-
- override private[flink] def resultType: TypeInformation[_] =
- throw new UnresolvedException(s"Calling resultType on ${this.getClass}.")
-
- override private[flink] def validateInput(): ValidationResult =
- ValidationFailure(s"Unresolved reference $name.")
-}
-
-case class PlannerResolvedFieldReference(name: String, resultType:
TypeInformation[_])
- extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- PlannerResolvedFieldReference(newName, resultType)
- }
- }
-}
-
-case class WindowReference(name: String, tpe: Option[TypeInformation[_]] =
None) extends Attribute {
-
- override private[flink] def resultType: TypeInformation[_] =
- tpe.getOrElse(throw new UnresolvedException("Could not resolve type of
referenced window."))
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- throw new ValidationException("Cannot rename window reference.")
- }
- }
-
- override def toString: String = s"'$name"
-}
-
-case class TableReference(name: String, tableOperation: QueryOperation)
- extends LeafExpression
- with NamedExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
- throw new UnresolvedException(s"Table reference '$name' has no result
type.")
-
- override private[flink] def toAttribute =
- throw new UnsupportedOperationException(s"A table reference '$name' can
not be an attribute.")
-
- override def toString: String = s"$name"
-}
-
-/** Expression to access the timestamp of a StreamRecord. */
-case class StreamRecordTimestamp() extends LeafExpression {
-
- override private[flink] def resultType = Types.LONG
-}
-
-/**
- * Special reference which represent a local field, such as aggregate buffers
or constants. We are
- * stored as class members, so the field can be referenced directly. We should
use an unique name to
- * locate the field.
- */
-case class PlannerLocalReference(name: String, resultType: TypeInformation[_])
extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) this
- else PlannerLocalReference(newName, resultType)
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
deleted file mode 100644
index b1dfacba354..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo,
SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime => jLocalTime}
-import java.util.{Calendar, TimeZone}
-
-object Literal {
- private[flink] val UTC = TimeZone.getTimeZone("UTC")
-
- private[flink] def apply(l: Any): Literal = l match {
- case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
- case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
- case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
- case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
- case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
- case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
- case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
- case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
- case javaDec: java.math.BigDecimal => Literal(javaDec,
BasicTypeInfo.BIG_DEC_TYPE_INFO)
- case scalaDec: scala.math.BigDecimal =>
- Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
- case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
- case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
- case sqlTimestamp: Timestamp => Literal(sqlTimestamp,
SqlTimeTypeInfo.TIMESTAMP)
- case localDate: LocalDate => Literal(localDate,
LocalTimeTypeInfo.LOCAL_DATE)
- case localTime: jLocalTime => Literal(localTime,
LocalTimeTypeInfo.LOCAL_TIME)
- case localDateTime: LocalDateTime => Literal(localDateTime,
LocalTimeTypeInfo.LOCAL_DATE_TIME)
- }
-}
-
-case class Literal(value: Any, resultType: TypeInformation[_]) extends
LeafExpression {
- override def toString: String = resultType match {
- case _: BasicTypeInfo[_] => value.toString
- case _ @SqlTimeTypeInfo.DATE => value.toString + ".toDate"
- case _ @SqlTimeTypeInfo.TIME => value.toString + ".toTime"
- case _ @SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
- case _ @TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
- case _ @TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
- case _ => s"Literal($value, $resultType)"
- }
-
- /**
- * Convert a Date value to a Calendar. Calcite's fromCalendarField functions
use the Calendar.get
- * methods, so the raw values of the individual fields are preserved when
converted to the String
- * formats.
- *
- * @return
- * get the Calendar value
- */
- private def valueAsCalendar: Calendar = {
- val date = value.asInstanceOf[java.util.Date]
- val cal = Calendar.getInstance
- cal.setTime(date)
- cal
- }
-}
-
-@deprecated(
- "Use nullOf(TypeInformation) instead. It is available through the implicit
Scala DSL.",
- "1.8.0")
-case class Null(resultType: TypeInformation[_]) extends LeafExpression {
- override def toString = s"null"
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
deleted file mode 100644
index 2147aaa0c71..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.utils.DateTimeUtils.{TimeUnit, TimeUnitRange}
-
-import org.apache.calcite.sql.fun.SqlTrimFunction
-
-import scala.language.{existentials, implicitConversions}
-
-/** General expression class to represent a symbol. */
-case class SymbolPlannerExpression(symbol: PlannerSymbol) extends
LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
- throw new UnsupportedOperationException("This should not happen. A symbol
has no result type.")
-
- def toExpr: SymbolPlannerExpression = this // triggers implicit conversion
-
- override def toString: String = s"${symbol.symbols}.${symbol.name}"
-
-}
-
-/** Symbol that wraps a Calcite symbol in form of a Java enum. */
-trait PlannerSymbol {
- def symbols: PlannerSymbols
- def name: String
- def enum: Enum[_]
-}
-
-/** Enumeration of symbols. */
-abstract class PlannerSymbols extends Enumeration {
-
- class PlannerSymbolValue(e: Enum[_]) extends Val(e.name()) with
PlannerSymbol {
- override def symbols: PlannerSymbols = PlannerSymbols.this
-
- override def enum: Enum[_] = e
-
- override def name: String = toString()
- }
-
- final protected def Value(enum: Enum[_]): PlannerSymbolValue = new
PlannerSymbolValue(enum)
-
- implicit def symbolToExpression(symbol: PlannerSymbolValue):
SymbolPlannerExpression =
- SymbolPlannerExpression(symbol)
-
-}
-
-/** Units for working with time intervals. */
-object PlannerTimeIntervalUnit extends PlannerSymbols {
-
- type PlannerTimeIntervalUnit = PlannerSymbolValue
-
- val MILLENNIUM = Value(TimeUnitRange.MILLENNIUM)
- val CENTURY = Value(TimeUnitRange.CENTURY)
- val DECADE = Value(TimeUnitRange.DECADE)
- val YEAR = Value(TimeUnitRange.YEAR)
- val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
- val QUARTER = Value(TimeUnitRange.QUARTER)
- val MONTH = Value(TimeUnitRange.MONTH)
- val WEEK = Value(TimeUnitRange.WEEK)
- val DAY = Value(TimeUnitRange.DAY)
- val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
- val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
- val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
- val HOUR = Value(TimeUnitRange.HOUR)
- val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
- val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
- val MINUTE = Value(TimeUnitRange.MINUTE)
- val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
- val SECOND = Value(TimeUnitRange.SECOND)
- val MILLISECOND = Value(TimeUnitRange.MILLISECOND)
- val MICROSECOND = Value(TimeUnitRange.MICROSECOND)
- val NANOSECOND = Value(TimeUnitRange.NANOSECOND)
- val EPOCH = Value(TimeUnitRange.EPOCH)
-
-}
-
-/** Units for working with time points. */
-object PlannerTimePointUnit extends PlannerSymbols {
-
- type PlannerTimePointUnit = PlannerSymbolValue
-
- val YEAR = Value(TimeUnit.YEAR)
- val MONTH = Value(TimeUnit.MONTH)
- val DAY = Value(TimeUnit.DAY)
- val HOUR = Value(TimeUnit.HOUR)
- val MINUTE = Value(TimeUnit.MINUTE)
- val SECOND = Value(TimeUnit.SECOND)
- val QUARTER = Value(TimeUnit.QUARTER)
- val WEEK = Value(TimeUnit.WEEK)
- val MILLISECOND = Value(TimeUnit.MILLISECOND)
- val MICROSECOND = Value(TimeUnit.MICROSECOND)
-
-}
-
-/** Modes for trimming strings. */
-object PlannerTrimMode extends PlannerSymbols {
-
- type PlannerTrimMode = PlannerSymbolValue
-
- val BOTH = Value(SqlTrimFunction.Flag.BOTH)
- val LEADING = Value(SqlTrimFunction.Flag.LEADING)
- val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
-
-}