This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 940c3a345b0 [FLINK-26782] Remove PlannerExpression and related
940c3a345b0 is described below

commit 940c3a345b0a8ce10f25876068191a5e4a015434
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Nov 17 17:52:53 2023 +0100

    [FLINK-26782] Remove PlannerExpression and related
---
 .../flink/table/catalog/FunctionCatalog.java       |  30 +-
 .../apache/flink/table/catalog/FunctionLookup.java |   4 -
 .../table/delegation/PlannerTypeInferenceUtil.java |  40 --
 .../resolver/rules/ResolveCallByArgumentsRule.java |  28 +-
 .../flink/table/utils/FunctionLookupMock.java      |  40 --
 .../functions/AggregateFunctionDefinition.java     |  12 +-
 .../LegacyUserDefinedFunctionInference.java        | 529 +++++++++++++++++++++
 .../table/functions/ScalarFunctionDefinition.java  |   9 +-
 .../TableAggregateFunctionDefinition.java          |  12 +-
 .../table/functions/TableFunctionDefinition.java   |  11 +-
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 153 ------
 .../table/planner/delegation/PlannerBase.scala     |   4 -
 .../planner/expressions/ExpressionBridge.scala     |  29 --
 .../table/planner/expressions/InputTypeSpec.scala  |  67 ---
 .../planner/expressions/PlannerExpression.scala    |  85 ----
 .../expressions/PlannerExpressionConverter.scala   | 221 ---------
 .../expressions/PlannerExpressionUtils.scala       |  67 ---
 .../flink/table/planner/expressions/TreeNode.scala | 110 -----
 .../table/planner/expressions/aggregations.scala   |  83 ----
 .../flink/table/planner/expressions/call.scala     | 138 ------
 .../planner/expressions/fieldExpression.scala      | 125 -----
 .../flink/table/planner/expressions/literals.scala |  83 ----
 .../flink/table/planner/expressions/symbols.scala  | 121 -----
 23 files changed, 569 insertions(+), 1432 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 06e275c2742..83074cd020b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -44,7 +43,6 @@ import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.resource.ResourceUri;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
@@ -81,12 +79,6 @@ public final class FunctionCatalog {
     private final Map<String, CatalogFunction> tempSystemFunctions;
     private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions;
 
-    /**
-     * Temporary utility until the new type inference is fully functional. It 
needs to be set by the
-     * planner.
-     */
-    private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
-
     public FunctionCatalog(
             ReadableConfig config,
             ResourceManager resourceManager,
@@ -98,8 +90,7 @@ public final class FunctionCatalog {
                 catalogManager,
                 moduleManager,
                 new LinkedHashMap<>(),
-                new LinkedHashMap<>(),
-                null);
+                new LinkedHashMap<>());
     }
 
     private FunctionCatalog(
@@ -108,19 +99,13 @@ public final class FunctionCatalog {
             CatalogManager catalogManager,
             ModuleManager moduleManager,
             Map<String, CatalogFunction> tempSystemFunctions,
-            Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions,
-            PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
+            Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions) {
         this.config = checkNotNull(config);
         this.resourceManager = checkNotNull(resourceManager);
         this.catalogManager = checkNotNull(catalogManager);
         this.moduleManager = checkNotNull(moduleManager);
         this.tempSystemFunctions = tempSystemFunctions;
         this.tempCatalogFunctions = tempCatalogFunctions;
-        this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
-    }
-
-    public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil 
plannerTypeInferenceUtil) {
-        this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
     }
 
     /** Registers a temporary system function. */
@@ -431,14 +416,6 @@ public final class FunctionCatalog {
                     UnresolvedIdentifier identifier) {
                 return FunctionCatalog.this.lookupFunction(identifier);
             }
-
-            @Override
-            public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
-                Preconditions.checkNotNull(
-                        plannerTypeInferenceUtil,
-                        "A planner should have set the type inference 
utility.");
-                return plannerTypeInferenceUtil;
-            }
         };
     }
 
@@ -807,8 +784,7 @@ public final class FunctionCatalog {
                 catalogManager,
                 moduleManager,
                 tempSystemFunctions,
-                tempCatalogFunctions,
-                plannerTypeInferenceUtil);
+                tempCatalogFunctions);
     }
 
     private void registerCatalogFunction(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
index c5b985a48ae..6e20409f8d3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 
 import java.util.Optional;
@@ -51,7 +50,4 @@ public interface FunctionLookup {
                                                 "Required built-in function 
[%s] could not be found in any catalog.",
                                                 definition.getName())));
     }
-
-    /** Temporary utility until the new type inference is fully functional. */
-    PlannerTypeInferenceUtil getPlannerTypeInferenceUtil();
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
deleted file mode 100644
index 28c1f67981f..00000000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.delegation;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-
-import java.util.List;
-
-/**
- * Temporary utility for validation and output type inference until all {@code 
PlannerExpression}
- * are upgraded to work with {@link TypeInferenceUtil}.
- */
-@Internal
-public interface PlannerTypeInferenceUtil {
-
-    /** Same behavior as {@link 
TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}. */
-    TypeInferenceUtil.Result runTypeInference(
-            UnresolvedCallExpression unresolvedCall, List<ResolvedExpression> 
resolvedArgs);
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index eb182cc954b..468f85942d3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.DataTypeFactory;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -161,7 +160,11 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                                     newInference,
                                                     resolvedArgs,
                                                     surroundingInfo))
-                            .orElseGet(() -> 
runLegacyTypeInference(unresolvedCall, resolvedArgs)));
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    "Could not get a type 
inference for function: "
+                                                            + name)));
         }
 
         @Override
@@ -235,13 +238,6 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
 
         /** Temporary method until all calls define a type inference. */
         private Optional<TypeInference> 
getOptionalTypeInference(FunctionDefinition definition) {
-            if (definition instanceof ScalarFunctionDefinition
-                    || definition instanceof TableFunctionDefinition
-                    || definition instanceof AggregateFunctionDefinition
-                    || definition instanceof TableAggregateFunctionDefinition) 
{
-                return Optional.empty();
-            }
-
             final TypeInference inference =
                     
definition.getTypeInference(resolutionContext.typeFactory());
             if (inference.getOutputTypeStrategy() != TypeStrategies.MISSING) {
@@ -275,20 +271,6 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
             return unresolvedCall.resolve(adaptedArguments, 
inferenceResult.getOutputDataType());
         }
 
-        private ResolvedExpression runLegacyTypeInference(
-                UnresolvedCallExpression unresolvedCall, 
List<ResolvedExpression> resolvedArgs) {
-
-            final PlannerTypeInferenceUtil util =
-                    
resolutionContext.functionLookup().getPlannerTypeInferenceUtil();
-
-            final Result inferenceResult = 
util.runTypeInference(unresolvedCall, resolvedArgs);
-
-            final List<ResolvedExpression> adaptedArguments =
-                    adaptArguments(inferenceResult, resolvedArgs);
-
-            return unresolvedCall.resolve(adaptedArguments, 
inferenceResult.getOutputDataType());
-        }
-
         /** Adapts the arguments according to the properties of the {@link 
Result}. */
         private List<ResolvedExpression> adaptArguments(
                 Result inferenceResult, List<ResolvedExpression> resolvedArgs) 
{
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
index d12ec985d2c..65bdcd201d1 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FunctionLookupMock.java
@@ -18,26 +18,16 @@
 
 package org.apache.flink.table.utils;
 
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-import org.apache.flink.table.types.utils.TypeConversions;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * A test implementation for a {@link FunctionLookup}. It mocks away a few 
features of a {@link
@@ -106,34 +96,4 @@ public final class FunctionLookupMock implements 
FunctionLookup {
         return ContextResolvedFunction.permanent(
                 FunctionIdentifier.of(definition.getName()), definition);
     }
-
-    @Override
-    public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
-        return (unresolvedCall, resolvedArgs) -> {
-            FunctionDefinition functionDefinition = 
unresolvedCall.getFunctionDefinition();
-            List<DataType> argumentTypes =
-                    resolvedArgs.stream()
-                            .map(ResolvedExpression::getOutputDataType)
-                            .collect(Collectors.toList());
-            if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) {
-                return new TypeInferenceUtil.Result(argumentTypes, null, 
DataTypes.BOOLEAN());
-            } else if 
(functionDefinition.equals(BuiltInFunctionDefinitions.IS_NULL)) {
-                return new TypeInferenceUtil.Result(argumentTypes, null, 
DataTypes.BOOLEAN());
-            } else if (functionDefinition instanceof ScalarFunctionDefinition) 
{
-                return new TypeInferenceUtil.Result(
-                        argumentTypes,
-                        null,
-                        // We do not support a full legacy type inference 
here. We support only a
-                        // static result
-                        // type
-                        TypeConversions.fromLegacyInfoToDataType(
-                                ((ScalarFunctionDefinition) functionDefinition)
-                                        .getScalarFunction()
-                                        .getResultType(null)));
-            }
-
-            throw new IllegalArgumentException(
-                    "Unsupported builtin function in the test: " + 
unresolvedCall);
-        };
-    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
index aac4119f03b..e596743eec0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunctionDefinition.java
@@ -19,14 +19,16 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * A "marker" function definition of an user-defined aggregate function that 
uses the old type
  * system stack.
@@ -77,8 +79,12 @@ public final class AggregateFunctionDefinition implements 
FunctionDefinition {
 
     @Override
     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
-        throw new TableException(
-                "Functions implemented for the old type system are not 
supported.");
+        return TypeInference.newBuilder()
+                .inputTypeStrategy(
+                        
LegacyUserDefinedFunctionInference.getInputTypeStrategy(aggregateFunction))
+                .outputTypeStrategy(
+                        
TypeStrategies.explicit(fromLegacyInfoToDataType(resultTypeInfo)))
+                .build();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
new file mode 100644
index 00000000000..9114e532bb7
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LegacyUserDefinedFunctionInference.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava31.com.google.common.primitives.Primitives;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+/**
+ * Ported {@code UserDefinedFunctionUtils} to run some of the type inference 
for legacy functions in
+ * Table API.
+ */
+@Internal
+@Deprecated
+public class LegacyUserDefinedFunctionInference {
+
+    public static InputTypeStrategy 
getInputTypeStrategy(ImperativeAggregateFunction<?, ?> func) {
+        return new InputTypeStrategy() {
+            @Override
+            public ArgumentCount getArgumentCount() {
+                return ConstantArgumentCount.any();
+            }
+
+            @Override
+            public Optional<List<DataType>> inferInputTypes(
+                    CallContext callContext, boolean throwOnFailure) {
+                final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+                final DataType accType = getAccumulatorType(func);
+                final LogicalType[] input =
+                        Stream.concat(Stream.of(accType), 
argumentDataTypes.stream())
+                                .map(DataType::getLogicalType)
+                                .toArray(LogicalType[]::new);
+
+                final Optional<Optional<Method>> foundMethod =
+                        Stream.of(
+                                        logicalTypesToInternalClasses(input),
+                                        logicalTypesToExternalClasses(input))
+                                .map(
+                                        signature ->
+                                                getUserDefinedMethod(
+                                                        func,
+                                                        "accumulate",
+                                                        signature,
+                                                        input,
+                                                        cls ->
+                                                                Stream.concat(
+                                                                               
 Stream.of(accType),
+                                                                               
 Arrays.stream(cls)
+                                                                               
         .skip(1)
+                                                                               
         .map(
+                                                                               
                 c ->
+                                                                               
                         fromLegacyInfoToDataType(
+                                                                               
                                 TypeExtractor
+                                                                               
                                         .createTypeInfo(
+                                                                               
                                                 c))))
+                                                                        
.toArray(DataType[]::new)))
+                                .filter(Optional::isPresent)
+                                .findAny();
+
+                if (foundMethod.isPresent()) {
+                    return Optional.of(argumentDataTypes);
+                } else {
+                    return callContext.fail(throwOnFailure, "");
+                }
+            }
+
+            @Override
+            public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+                return getSignatures(func, "accumulate");
+            }
+        };
+    }
+
+    public static InputTypeStrategy getInputTypeStrategy(TableFunction<?> 
func) {
+        return new InputTypeStrategy() {
+            @Override
+            public ArgumentCount getArgumentCount() {
+                return ConstantArgumentCount.any();
+            }
+
+            @Override
+            public Optional<List<DataType>> inferInputTypes(
+                    CallContext callContext, boolean throwOnFailure) {
+                final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+                final LogicalType[] input =
+                        argumentDataTypes.stream()
+                                .map(DataType::getLogicalType)
+                                .toArray(LogicalType[]::new);
+
+                final Optional<Method> foundMethod =
+                        getUserDefinedMethod(
+                                func,
+                                "eval",
+                                argumentDataTypes.stream()
+                                        .map(DataType::getConversionClass)
+                                        .toArray(Class<?>[]::new),
+                                input,
+                                cls ->
+                                        Arrays.stream(cls)
+                                                .map(
+                                                        c ->
+                                                                
fromLegacyInfoToDataType(
+                                                                        
TypeExtractor
+                                                                               
 .createTypeInfo(c)))
+                                                .toArray(DataType[]::new));
+
+                if (foundMethod.isPresent()) {
+                    return Optional.of(argumentDataTypes);
+                } else {
+                    return callContext.fail(
+                            throwOnFailure, "Given parameters do not match any 
signature.");
+                }
+            }
+
+            @Override
+            public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+                return getSignatures(func, "eval");
+            }
+        };
+    }
+
+    public static TypeStrategy getOutputTypeStrategy(ScalarFunction func) {
+        return callContext -> {
+            final LogicalType[] params =
+                    callContext.getArgumentDataTypes().stream()
+                            .map(DataType::getLogicalType)
+                            .toArray(LogicalType[]::new);
+            Optional<Class<?>[]> evalParams = getEvalMethodSignature(func, 
params);
+            if (!evalParams.isPresent()) {
+                return Optional.empty();
+            }
+
+            final TypeInformation<?> userDefinedTypeInfo = 
func.getResultType(evalParams.get());
+            if (userDefinedTypeInfo != null) {
+                return 
Optional.of(fromLegacyInfoToDataType(userDefinedTypeInfo));
+            } else {
+                final Optional<Method> eval =
+                        getUserDefinedMethod(
+                                func,
+                                "eval",
+                                logicalTypesToExternalClasses(params),
+                                params,
+                                (paraClasses) ->
+                                        
Arrays.stream(func.getParameterTypes(paraClasses))
+                                                
.map(TypeConversions::fromLegacyInfoToDataType)
+                                                .toArray(DataType[]::new));
+                return eval.flatMap(m -> 
TypeConversions.fromClassToDataType(m.getReturnType()));
+            }
+        };
+    }
+
+    public static InputTypeStrategy getInputTypeStrategy(ScalarFunction func) {
+        return new InputTypeStrategy() {
+            @Override
+            public ArgumentCount getArgumentCount() {
+                return ConstantArgumentCount.any();
+            }
+
+            @Override
+            public Optional<List<DataType>> inferInputTypes(
+                    CallContext callContext, boolean throwOnFailure) {
+                final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+                final LogicalType[] input =
+                        argumentDataTypes.stream()
+                                .map(DataType::getLogicalType)
+                                .toArray(LogicalType[]::new);
+
+                final Optional<Method> foundMethod =
+                        getUserDefinedMethod(
+                                func,
+                                "eval",
+                                logicalTypesToExternalClasses(input),
+                                input,
+                                (paraClasses) ->
+                                        
Arrays.stream(func.getParameterTypes(paraClasses))
+                                                
.map(TypeConversions::fromLegacyInfoToDataType)
+                                                .toArray(DataType[]::new));
+
+                if (foundMethod.isPresent()) {
+                    return Optional.of(argumentDataTypes);
+                } else {
+                    return callContext.fail(
+                            throwOnFailure, "Given parameters do not match any 
signature.");
+                }
+            }
+
+            @Override
+            public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+                return getSignatures(func, "eval");
+            }
+        };
+    }
+
+    private static Optional<Class<?>[]> getEvalMethodSignature(
+            ScalarFunction func, LogicalType[] expectedTypes) {
+        return getUserDefinedMethod(
+                        func,
+                        "eval",
+                        logicalTypesToExternalClasses(expectedTypes),
+                        expectedTypes,
+                        (paraClasses) ->
+                                
Arrays.stream(func.getParameterTypes(paraClasses))
+                                        
.map(TypeConversions::fromLegacyInfoToDataType)
+                                        .toArray(DataType[]::new))
+                .map(
+                        m ->
+                                getParamClassesConsiderVarArgs(
+                                        m.isVarArgs(),
+                                        m.getParameterTypes(),
+                                        expectedTypes.length));
+    }
+
+    private static Class<?>[] getParamClassesConsiderVarArgs(
+            boolean isVarArgs, Class<?>[] matchingSignature, int 
expectedLength) {
+        return IntStream.range(0, expectedLength)
+                .mapToObj(
+                        i -> {
+                            if (i < matchingSignature.length - 1) {
+                                return matchingSignature[i];
+                            } else if (isVarArgs) {
+                                return 
matchingSignature[matchingSignature.length - 1]
+                                        .getComponentType();
+                            } else {
+                                // last argument is not an array type
+                                return 
matchingSignature[matchingSignature.length - 1];
+                            }
+                        })
+                .toArray(Class<?>[]::new);
+    }
+
+    private static DataType getAccumulatorType(ImperativeAggregateFunction<?, 
?> func) {
+        final TypeInformation<?> accType = func.getAccumulatorType();
+        if (accType != null) {
+            return fromLegacyInfoToDataType(accType);
+        } else {
+            try {
+                return fromLegacyInfoToDataType(
+                        TypeExtractor.createTypeInfo(
+                                func, ImperativeAggregateFunction.class, 
func.getClass(), 1));
+            } catch (InvalidTypesException ite) {
+                throw new TableException(
+                        String.format(
+                                "Cannot infer generic type of %s}. You can 
override"
+                                        + " 
ImperativeAggregateFunction.getAccumulatorType()"
+                                        + " to specify the type.",
+                                func.getClass()),
+                        ite);
+            }
+        }
+    }
+
+    private static Class<?>[] logicalTypesToExternalClasses(LogicalType[] 
types) {
+        return Arrays.stream(types)
+                .map(
+                        t -> {
+                            if (t == null) {
+                                return null;
+                            } else {
+                                return TypeConversions.fromLogicalToDataType(t)
+                                        .getConversionClass();
+                            }
+                        })
+                .toArray(Class<?>[]::new);
+    }
+
+    private static Class<?>[] logicalTypesToInternalClasses(LogicalType[] 
types) {
+        return Arrays.stream(types)
+                .map(
+                        t -> {
+                            if (t == null) {
+                                return null;
+                            } else {
+                                return toInternalConversionClass(t);
+                            }
+                        })
+                .toArray(Class<?>[]::new);
+    }
+
+    private static boolean parameterClassEquals(Class<?> candidate, Class<?> 
expected) {
+        return candidate == null
+                || candidate == expected
+                || expected == Object.class
+                || candidate == Object.class
+                || // Special case when we don't know the type
+                expected.isPrimitive() && Primitives.wrap(expected) == 
candidate
+                || candidate == Date.class && (expected == Integer.class || 
expected == int.class)
+                || candidate == Time.class && (expected == Integer.class || 
expected == int.class)
+                || candidate == StringData.class && expected == String.class
+                || candidate == String.class && expected == StringData.class
+                || candidate == TimestampData.class && expected == 
LocalDateTime.class
+                || candidate == Timestamp.class && expected == 
TimestampData.class
+                || candidate == TimestampData.class && expected == 
Timestamp.class
+                || candidate == LocalDateTime.class && expected == 
TimestampData.class
+                || candidate == TimestampData.class && expected == 
Instant.class
+                || candidate == Instant.class && expected == 
TimestampData.class
+                || RowData.class.isAssignableFrom(candidate) && expected == 
Row.class
+                || candidate == Row.class && 
RowData.class.isAssignableFrom(expected)
+                || RowData.class.isAssignableFrom(candidate) && expected == 
RowData.class
+                || candidate == RowData.class && 
RowData.class.isAssignableFrom(expected)
+                || candidate == DecimalData.class && expected == 
BigDecimal.class
+                || candidate == BigDecimal.class && expected == 
DecimalData.class
+                || (candidate.isArray()
+                        && expected.isArray()
+                        && candidate.getComponentType() != null
+                        && expected.getComponentType() == Object.class);
+    }
+
+    private static boolean parameterDataTypeEquals(LogicalType internal, 
DataType parameterType) {
+        if (internal.is(LogicalTypeRoot.RAW)
+                && parameterType.getLogicalType().is(LogicalTypeRoot.RAW)) {
+            return 
TypeConversions.fromLogicalToDataType(internal).getConversionClass()
+                    == parameterType.getConversionClass();
+        } else {
+            // There is a special equal to GenericType. We need rewrite type 
extract to RowData
+            // etc...
+            return parameterType.getLogicalType() == internal
+                    || toInternalConversionClass(internal)
+                            == 
toInternalConversionClass(parameterType.getLogicalType());
+        }
+    }
+
+    private static Optional<Method> getUserDefinedMethod(
+            UserDefinedFunction function,
+            String methodName,
+            Class<?>[] methodSignature,
+            LogicalType[] internalTypes,
+            Function<Class<?>[], DataType[]> parameterTypesFunc) {
+        final List<Method> methods = checkAndExtractMethods(function, 
methodName);
+
+        final List<Method> filteredMethods =
+                methods.stream()
+                        .filter(
+                                cur -> {
+                                    final Class<?>[] parameterTypes = 
cur.getParameterTypes();
+                                    final DataType[] parameterDataTypes =
+                                            
parameterTypesFunc.apply(parameterTypes);
+                                    if (cur.isVarArgs()) {
+                                        return varArgsMethodMatch(
+                                                methodSignature,
+                                                internalTypes,
+                                                parameterTypes,
+                                                parameterDataTypes);
+
+                                    } else {
+                                        return methodMatch(
+                                                methodSignature,
+                                                internalTypes,
+                                                parameterTypes,
+                                                parameterDataTypes);
+                                    }
+                                })
+                        .collect(Collectors.toList());
+
+        // if there is a fixed method, compiler will call this method 
preferentially
+
+        final List<Method> found =
+                filteredMethods.stream()
+                        .sorted(Comparator.comparing(Method::isVarArgs, 
Boolean::compareTo))
+                        .filter(cur -> 
!Modifier.isVolatile(cur.getModifiers()))
+                        .collect(Collectors.toList());
+
+        final int foundCount = found.size();
+        if (foundCount == 1) {
+            return Optional.of(found.get(0));
+        } else if (foundCount > 1) {
+            if (Arrays.asList(methodSignature).contains(Object.class)) {
+                return Optional.of(found.get(0));
+            }
+
+            final List<Method> nonObjectParameterMethods =
+                    found.stream()
+                            .filter(
+                                    m ->
+                                            
!Arrays.asList(m.getParameterTypes())
+                                                    .contains(Object.class))
+                            .collect(Collectors.toList());
+
+            if (nonObjectParameterMethods.size() == 1) {
+                return Optional.of(nonObjectParameterMethods.get(0));
+            }
+
+            throw new ValidationException(
+                    String.format(
+                            "Found multiple '%s' methods which match the 
signature.", methodName));
+        }
+
+        return Optional.empty();
+    }
+
+    private static boolean methodMatch(
+            Class<?>[] methodSignature,
+            LogicalType[] internalTypes,
+            Class<?>[] parameterTypes,
+            DataType[] parameterDataTypes) {
+        // match parameters of signature to actual parameters
+        return methodSignature.length == parameterTypes.length
+                && IntStream.range(0, parameterTypes.length)
+                        .allMatch(
+                                i ->
+                                        
parameterClassEquals(methodSignature[i], parameterTypes[i])
+                                                || parameterDataTypeEquals(
+                                                        internalTypes[i], 
parameterDataTypes[i]));
+    }
+
+    private static boolean varArgsMethodMatch(
+            Class<?>[] methodSignature,
+            LogicalType[] internalTypes,
+            Class<?>[] parameterTypes,
+            DataType[] parameterDataTypes) {
+        if (methodSignature.length == 0 && parameterTypes.length == 1) {
+            return true;
+        } else {
+            return IntStream.range(0, methodSignature.length)
+                    .allMatch(
+                            i -> {
+                                if (i < parameterTypes.length - 1) {
+                                    return parameterClassEquals(
+                                                    methodSignature[i], 
parameterTypes[i])
+                                            || parameterDataTypeEquals(
+                                                    internalTypes[i], 
parameterDataTypes[i]);
+                                } else {
+                                    return parameterClassEquals(
+                                                    methodSignature[i],
+                                                    
parameterTypes[parameterTypes.length - 1]
+                                                            
.getComponentType())
+                                            || parameterDataTypeEquals(
+                                                    internalTypes[i], 
parameterDataTypes[i]);
+                                }
+                            });
+        }
+    }
+
+    private static List<Method> checkAndExtractMethods(
+            UserDefinedFunction function, String methodName) {
+        final List<Method> methods =
+                Arrays.stream(function.getClass().getMethods())
+                        .filter(
+                                m -> {
+                                    final int modifiers = m.getModifiers();
+                                    return m.getName().equals(methodName)
+                                            && Modifier.isPublic(modifiers)
+                                            && !Modifier.isAbstract(modifiers)
+                                            && !(function instanceof 
TableFunction
+                                                    && 
Modifier.isStatic(modifiers));
+                                })
+                        .collect(Collectors.toList());
+
+        if (methods.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Function class '%s' does not implement at least "
+                                    + "one method named '%s' which is public, 
not abstract and "
+                                    + "(in case of table functions) not 
static.",
+                            function.getClass().getCanonicalName(), 
methodName));
+        }
+
+        return methods;
+    }
+
+    private static List<Signature> getSignatures(UserDefinedFunction func, 
String methodName) {
+        return checkAndExtractMethods(func, methodName).stream()
+                .map(Method::getParameterTypes)
+                .map(
+                        sig ->
+                                Signature.of(
+                                        Arrays.stream(sig)
+                                                .map(s -> 
Signature.Argument.of(s.getSimpleName()))
+                                                .collect(Collectors.toList())))
+                .collect(Collectors.toList());
+    }
+
+    private LegacyUserDefinedFunctionInference() {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
index 5ae2752e164..fe3c8f9c3cc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunctionDefinition.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.util.Preconditions;
@@ -62,8 +61,12 @@ public final class ScalarFunctionDefinition implements 
FunctionDefinition {
 
     @Override
     public TypeInference getTypeInference(DataTypeFactory factory) {
-        throw new TableException(
-                "Functions implemented for the old type system are not 
supported.");
+        return TypeInference.newBuilder()
+                .inputTypeStrategy(
+                        
LegacyUserDefinedFunctionInference.getInputTypeStrategy(scalarFunction))
+                .outputTypeStrategy(
+                        
LegacyUserDefinedFunctionInference.getOutputTypeStrategy(scalarFunction))
+                .build();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
index e8633863764..7a5015816c0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunctionDefinition.java
@@ -19,14 +19,16 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * A "marker" function definition of an user-defined table aggregate function 
that uses the old type
  * system stack.
@@ -77,8 +79,12 @@ public final class TableAggregateFunctionDefinition 
implements FunctionDefinitio
 
     @Override
     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
-        throw new TableException(
-                "Functions implemented for the old type system are not 
supported.");
+        return TypeInference.newBuilder()
+                .inputTypeStrategy(
+                        
LegacyUserDefinedFunctionInference.getInputTypeStrategy(aggregateFunction))
+                .outputTypeStrategy(
+                        
TypeStrategies.explicit(fromLegacyInfoToDataType(resultTypeInfo)))
+                .build();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
index bc540be441c..b9ccd161770 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunctionDefinition.java
@@ -19,14 +19,16 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * A "marker" function definition of an user-defined table function that uses 
the old type system
  * stack.
@@ -68,8 +70,11 @@ public final class TableFunctionDefinition implements 
FunctionDefinition {
 
     @Override
     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
-        throw new TableException(
-                "Functions implemented for the old type system are not 
supported.");
+        return TypeInference.newBuilder()
+                .inputTypeStrategy(
+                        
LegacyUserDefinedFunctionInference.getInputTypeStrategy(tableFunction))
+                
.outputTypeStrategy(TypeStrategies.explicit(fromLegacyInfoToDataType(resultType)))
+                .build();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
deleted file mode 100644
index 0882dc77d8c..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.expressions;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.planner.typeutils.TypeCoercion;
-import org.apache.flink.table.planner.validate.ValidationFailure;
-import org.apache.flink.table.planner.validate.ValidationResult;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.TypeInferenceUtil;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static 
org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toJava;
-import static 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/** Implementation of {@link PlannerTypeInferenceUtil}. */
-@Internal
-public final class PlannerTypeInferenceUtilImpl implements 
PlannerTypeInferenceUtil {
-
-    public static final PlannerTypeInferenceUtil INSTANCE = new 
PlannerTypeInferenceUtilImpl();
-
-    private static final PlannerExpressionConverter CONVERTER =
-            PlannerExpressionConverter.INSTANCE();
-
-    @Override
-    public TypeInferenceUtil.Result runTypeInference(
-            UnresolvedCallExpression unresolvedCall, List<ResolvedExpression> 
resolvedArgs) {
-        // We should not try to resolve the children again with the old type 
stack
-        // The arguments might have been resolved with the new stack already. 
In that case the
-        // resolution will fail.
-        unresolvedCall = unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
-        final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER);
-
-        if (plannerCall instanceof InputTypeSpec) {
-            return resolveWithCastedAssignment(
-                    unresolvedCall,
-                    resolvedArgs,
-                    toJava(((InputTypeSpec) plannerCall).expectedTypes()),
-                    plannerCall.resultType());
-        } else {
-            validateArguments(plannerCall);
-
-            final List<DataType> expectedArgumentTypes =
-                    resolvedArgs.stream()
-                            .map(ResolvedExpression::getOutputDataType)
-                            .collect(Collectors.toList());
-
-            return new TypeInferenceUtil.Result(
-                    expectedArgumentTypes,
-                    null,
-                    fromLegacyInfoToDataType(plannerCall.resultType()));
-        }
-    }
-
-    private TypeInferenceUtil.Result resolveWithCastedAssignment(
-            UnresolvedCallExpression unresolvedCall,
-            List<ResolvedExpression> args,
-            List<TypeInformation<?>> expectedTypes,
-            TypeInformation<?> resultType) {
-
-        final List<PlannerExpression> plannerArgs =
-                unresolvedCall.getChildren().stream()
-                        .map(e -> e.accept(CONVERTER))
-                        .collect(Collectors.toList());
-
-        final List<DataType> castedArgs =
-                IntStream.range(0, plannerArgs.size())
-                        .mapToObj(
-                                idx ->
-                                        castIfNeeded(
-                                                args.get(idx),
-                                                plannerArgs.get(idx),
-                                                expectedTypes.get(idx)))
-                        .collect(Collectors.toList());
-
-        return new TypeInferenceUtil.Result(castedArgs, null, 
fromLegacyInfoToDataType(resultType));
-    }
-
-    private void validateArguments(PlannerExpression plannerCall) {
-        if (!plannerCall.valid()) {
-            throw new ValidationException(
-                    getValidationErrorMessage(plannerCall)
-                            .orElse(
-                                    "Unexpected behavior, validation failed 
but can't get error messages!"));
-        }
-    }
-
-    /**
-     * Return the validation error message of this {@link PlannerExpression} 
or return the
-     * validation error message of it's children if it passes the validation. 
Return empty if all
-     * validation succeeded.
-     */
-    private Optional<String> getValidationErrorMessage(PlannerExpression 
plannerCall) {
-        ValidationResult validationResult = plannerCall.validateInput();
-        if (validationResult instanceof ValidationFailure) {
-            return Optional.of(((ValidationFailure) 
validationResult).message());
-        } else {
-            for (Expression plannerExpression : plannerCall.getChildren()) {
-                Optional<String> errorMessage =
-                        getValidationErrorMessage((PlannerExpression) 
plannerExpression);
-                if (errorMessage.isPresent()) {
-                    return errorMessage;
-                }
-            }
-        }
-        return Optional.empty();
-    }
-
-    private DataType castIfNeeded(
-            ResolvedExpression child,
-            PlannerExpression plannerChild,
-            TypeInformation<?> expectedType) {
-        TypeInformation<?> actualType = plannerChild.resultType();
-        if (actualType.equals(expectedType)) {
-            return child.getOutputDataType();
-        } else if (TypeCoercion.canSafelyCast(
-                fromTypeInfoToLogicalType(actualType), 
fromTypeInfoToLogicalType(expectedType))) {
-            return fromLegacyInfoToDataType(expectedType);
-        } else {
-            throw new ValidationException(
-                    String.format(
-                            "Incompatible type of argument: %s Expected: %s", 
child, expectedType));
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 48fe89d5303..59e44941a8e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -36,7 +36,6 @@ import org.apache.flink.table.planner.calcite._
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
-import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
@@ -96,9 +95,6 @@ abstract class PlannerBase(
     classLoader: ClassLoader)
   extends Planner {
 
-  // temporary utility until we don't use planner expressions anymore
-  
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
-
   private var parserFactory: ParserFactory = _
   private var parser: Parser = _
   private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
deleted file mode 100644
index f226cff31a6..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/ExpressionBridge.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor}
-
-/** Bridges between API [[Expression]]s (for both Java and Scala) and final 
expression stack. */
-class ExpressionBridge[E <: Expression](finalVisitor: ExpressionVisitor[E]) {
-
-  def bridge(expression: Expression): E = {
-    // convert to final expressions
-    expression.accept(finalVisitor)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
deleted file mode 100644
index 42b1c9da118..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/InputTypeSpec.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.planner.validate._
-
-import scala.collection.mutable
-
-/** Expressions that have strict data type specification on its inputs. */
-trait InputTypeSpec extends PlannerExpression {
-
-  /**
-   * Input type specification for each child.
-   *
-   * For example, [[Power]] expecting both of the children be of double type 
should use:
-   * {{{
-   *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: 
DOUBLE_TYPE_INFO :: Nil
-   * }}}
-   *
-   * Inputs that don't match the expected type will be safely casted to a 
higher type. Therefore,
-   * use the decimal type with caution as all numeric types would be casted to 
a very inefficient
-   * type.
-   */
-  private[flink] def expectedTypes: Seq[TypeInformation[_]]
-
-  override private[flink] def validateInput(): ValidationResult = {
-    val typeMismatches = mutable.ArrayBuffer.empty[String]
-
-    if (expectedTypes.size != children.size) {
-      return ValidationFailure(
-        s"""|$this fails on input type size checking: expected types 
size[${expectedTypes.size}].
-            |Operands types size[${children.size}].
-            |""".stripMargin)
-    }
-
-    children.zip(expectedTypes).zipWithIndex.foreach {
-      case ((e, tpe), i) =>
-        if (e.resultType != tpe) {
-          typeMismatches += s"expecting $tpe on ${i}th input, get 
${e.resultType}"
-        }
-    }
-    if (typeMismatches.isEmpty) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(
-        s"""|$this fails on input type checking: 
${typeMismatches.mkString("[", ", ", "]")}.
-            |Operand should be casted to proper type
-            |""".stripMargin)
-    }
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
deleted file mode 100644
index 89054e53f46..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpression.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.expressions.{Expression, ExpressionVisitor}
-import org.apache.flink.table.planner.validate.{ValidationResult, 
ValidationSuccess}
-
-import _root_.scala.collection.JavaConversions._
-
-import java.util
-
-abstract class PlannerExpression extends TreeNode[PlannerExpression] with 
Expression {
-
-  /**
-   * Returns the [[TypeInformation]] for evaluating this expression. It is 
sometimes not available
-   * until the expression is valid.
-   */
-  private[flink] def resultType: TypeInformation[_]
-
-  /** One pass validation of the expression tree in post order. */
-  private[flink] lazy val valid: Boolean = childrenValid && 
validateInput().isSuccess
-
-  private[flink] def childrenValid: Boolean = children.forall(_.valid)
-
-  /**
-   * Check input data types, inputs number or other properties specified by 
this expression. Return
-   * `ValidationSuccess` if it pass the check, or `ValidationFailure` with 
supplement message
-   * explaining the error. Note: we should only call this method until 
`childrenValid == true`
-   */
-  private[flink] def validateInput(): ValidationResult = ValidationSuccess
-
-  private[flink] def checkEquals(other: PlannerExpression): Boolean = {
-    if (this.getClass != other.getClass) {
-      false
-    } else {
-      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
-        elements1.length == elements2.length && 
elements1.zip(elements2).forall {
-          case (e1: PlannerExpression, e2: PlannerExpression) => 
e1.checkEquals(e2)
-          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
-          case (i1, i2) => i1 == i2
-        }
-      }
-      val elements1 = this.productIterator.toSeq
-      val elements2 = other.productIterator.toSeq
-      checkEquality(elements1, elements2)
-    }
-  }
-
-  override def asSummaryString(): String = toString
-
-  override def getChildren: util.List[Expression] = children
-
-  override def accept[R](visitor: ExpressionVisitor[R]): R = 
visitor.visit(this)
-}
-
-abstract class BinaryExpression extends PlannerExpression {
-  private[flink] def left: PlannerExpression
-  private[flink] def right: PlannerExpression
-  private[flink] def children = Seq(left, right)
-}
-
-abstract class UnaryExpression extends PlannerExpression {
-  private[flink] def child: PlannerExpression
-  private[flink] def children = Seq(child)
-}
-
-abstract class LeafExpression extends PlannerExpression {
-  private[flink] val children = Nil
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
deleted file mode 100644
index bf3c13698f8..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions._
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
-import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, 
SYMBOL}
-import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasLength, 
hasPrecision, hasScale}
-
-import _root_.scala.collection.JavaConverters._
-
-/** Visitor implementation for converting [[Expression]]s to 
[[PlannerExpression]]s. */
-class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExpression] {
-
-  override def visit(call: CallExpression): PlannerExpression = {
-    val definition = call.getFunctionDefinition
-    translateCall(
-      definition,
-      call.getChildren.asScala,
-      () =>
-        definition match {
-          case ROW | ARRAY | MAP => 
ApiResolvedExpression(call.getOutputDataType)
-          case _ =>
-            if (
-              definition.getKind == FunctionKind.AGGREGATE ||
-              definition.getKind == FunctionKind.TABLE_AGGREGATE
-            ) {
-              ApiResolvedAggregateCallExpression(call)
-            } else {
-              ApiResolvedExpression(call.getOutputDataType)
-            }
-        }
-    )
-  }
-
-  override def visit(unresolvedCall: UnresolvedCallExpression): 
PlannerExpression = {
-    val definition = unresolvedCall.getFunctionDefinition
-    translateCall(
-      definition,
-      unresolvedCall.getChildren.asScala,
-      () => throw new TableException(s"Unsupported function definition: 
$definition"))
-  }
-
-  private def translateCall(
-      func: FunctionDefinition,
-      children: Seq[Expression],
-      unknownFunctionHandler: () => PlannerExpression): PlannerExpression = {
-
-    val args = children.map(_.accept(this))
-
-    func match {
-      case sfd: ScalarFunctionDefinition =>
-        val call = PlannerScalarFunctionCall(sfd.getScalarFunction, args)
-        // it configures underlying state
-        call.validateInput()
-        call
-
-      case tfd: TableFunctionDefinition =>
-        PlannerTableFunctionCall(tfd.toString, tfd.getTableFunction, args, 
tfd.getResultType)
-
-      case afd: AggregateFunctionDefinition =>
-        AggFunctionCall(
-          afd.getAggregateFunction,
-          afd.getResultTypeInfo,
-          afd.getAccumulatorTypeInfo,
-          args)
-
-      case tafd: TableAggregateFunctionDefinition =>
-        AggFunctionCall(
-          tafd.getTableAggregateFunction,
-          tafd.getResultTypeInfo,
-          tafd.getAccumulatorTypeInfo,
-          args)
-
-      case _: FunctionDefinition =>
-        unknownFunctionHandler()
-    }
-  }
-
-  override def visit(literal: ValueLiteralExpression): PlannerExpression = {
-    if (literal.getOutputDataType.getLogicalType.is(SYMBOL)) {
-      val plannerSymbol = 
getSymbol(literal.getValueAs(classOf[TableSymbol]).get())
-      return SymbolPlannerExpression(plannerSymbol)
-    }
-
-    val typeInfo = getLiteralTypeInfo(literal)
-    if (literal.isNull) {
-      Null(typeInfo)
-    } else {
-      Literal(literal.getValueAs(typeInfo.getTypeClass).get(), typeInfo)
-    }
-  }
-
-  /** This method makes the planner more lenient for new data types defined 
for literals. */
-  private def getLiteralTypeInfo(literal: ValueLiteralExpression): 
TypeInformation[_] = {
-    val logicalType = literal.getOutputDataType.getLogicalType
-
-    if (logicalType.is(DECIMAL)) {
-      if (literal.isNull) {
-        return Types.BIG_DEC
-      }
-      val value = literal.getValueAs(classOf[java.math.BigDecimal]).get()
-      if (hasPrecision(logicalType, value.precision()) && 
hasScale(logicalType, value.scale())) {
-        return Types.BIG_DEC
-      }
-    } else if (logicalType.is(CHAR)) {
-      if (literal.isNull) {
-        return Types.STRING
-      }
-      val value = literal.getValueAs(classOf[java.lang.String]).get()
-      if (hasLength(logicalType, value.length)) {
-        return Types.STRING
-      }
-    }
-
-    fromDataTypeToTypeInfo(literal.getOutputDataType)
-  }
-
-  private def getSymbol(symbol: TableSymbol): PlannerSymbol = symbol match {
-    case TimeIntervalUnit.MILLENNIUM => PlannerTimeIntervalUnit.MILLENNIUM
-    case TimeIntervalUnit.CENTURY => PlannerTimeIntervalUnit.CENTURY
-    case TimeIntervalUnit.DECADE => PlannerTimeIntervalUnit.DECADE
-    case TimeIntervalUnit.YEAR => PlannerTimeIntervalUnit.YEAR
-    case TimeIntervalUnit.YEAR_TO_MONTH => 
PlannerTimeIntervalUnit.YEAR_TO_MONTH
-    case TimeIntervalUnit.QUARTER => PlannerTimeIntervalUnit.QUARTER
-    case TimeIntervalUnit.MONTH => PlannerTimeIntervalUnit.MONTH
-    case TimeIntervalUnit.WEEK => PlannerTimeIntervalUnit.WEEK
-    case TimeIntervalUnit.DAY => PlannerTimeIntervalUnit.DAY
-    case TimeIntervalUnit.DAY_TO_HOUR => PlannerTimeIntervalUnit.DAY_TO_HOUR
-    case TimeIntervalUnit.DAY_TO_MINUTE => 
PlannerTimeIntervalUnit.DAY_TO_MINUTE
-    case TimeIntervalUnit.DAY_TO_SECOND => 
PlannerTimeIntervalUnit.DAY_TO_SECOND
-    case TimeIntervalUnit.HOUR => PlannerTimeIntervalUnit.HOUR
-    case TimeIntervalUnit.SECOND => PlannerTimeIntervalUnit.SECOND
-    case TimeIntervalUnit.HOUR_TO_MINUTE => 
PlannerTimeIntervalUnit.HOUR_TO_MINUTE
-    case TimeIntervalUnit.HOUR_TO_SECOND => 
PlannerTimeIntervalUnit.HOUR_TO_SECOND
-    case TimeIntervalUnit.MINUTE => PlannerTimeIntervalUnit.MINUTE
-    case TimeIntervalUnit.MINUTE_TO_SECOND => 
PlannerTimeIntervalUnit.MINUTE_TO_SECOND
-    case TimeIntervalUnit.MILLISECOND => PlannerTimeIntervalUnit.MILLISECOND
-    case TimeIntervalUnit.MICROSECOND => PlannerTimeIntervalUnit.MICROSECOND
-    case TimeIntervalUnit.NANOSECOND => PlannerTimeIntervalUnit.NANOSECOND
-    case TimeIntervalUnit.EPOCH => PlannerTimeIntervalUnit.EPOCH
-    case TimePointUnit.YEAR => PlannerTimePointUnit.YEAR
-    case TimePointUnit.MONTH => PlannerTimePointUnit.MONTH
-    case TimePointUnit.DAY => PlannerTimePointUnit.DAY
-    case TimePointUnit.HOUR => PlannerTimePointUnit.HOUR
-    case TimePointUnit.MINUTE => PlannerTimePointUnit.MINUTE
-    case TimePointUnit.SECOND => PlannerTimePointUnit.SECOND
-    case TimePointUnit.QUARTER => PlannerTimePointUnit.QUARTER
-    case TimePointUnit.WEEK => PlannerTimePointUnit.WEEK
-    case TimePointUnit.MILLISECOND => PlannerTimePointUnit.MILLISECOND
-    case TimePointUnit.MICROSECOND => PlannerTimePointUnit.MICROSECOND
-
-    case _ =>
-      throw new TableException("Unsupported symbol: " + symbol)
-  }
-
-  override def visit(fieldReference: FieldReferenceExpression): 
PlannerExpression = {
-    PlannerResolvedFieldReference(
-      fieldReference.getName,
-      fromDataTypeToTypeInfo(fieldReference.getOutputDataType))
-  }
-
-  override def visit(fieldReference: UnresolvedReferenceExpression): 
PlannerExpression = {
-    UnresolvedFieldReference(fieldReference.getName)
-  }
-
-  override def visit(typeLiteral: TypeLiteralExpression): PlannerExpression = {
-    ApiResolvedExpression(typeLiteral.getOutputDataType)
-  }
-
-  override def visit(tableRef: TableReferenceExpression): PlannerExpression = {
-    TableReference(
-      tableRef.asInstanceOf[TableReferenceExpression].getName,
-      tableRef.asInstanceOf[TableReferenceExpression].getQueryOperation
-    )
-  }
-
-  override def visit(local: LocalReferenceExpression): PlannerExpression =
-    PlannerLocalReference(local.getName, 
fromDataTypeToTypeInfo(local.getOutputDataType))
-
-  override def visit(lookupCall: LookupCallExpression): PlannerExpression =
-    throw new TableException("Unsupported function call: " + lookupCall)
-
-  override def visit(sqlCall: SqlCallExpression): PlannerExpression =
-    throw new TableException("Unsupported function call: " + sqlCall)
-
-  override def visit(other: ResolvedExpression): PlannerExpression = 
visitNonApiExpression(other)
-
-  override def visitNonApiExpression(other: Expression): PlannerExpression = {
-    other match {
-      // already converted planner expressions will pass this visitor without 
modification
-      case plannerExpression: PlannerExpression => plannerExpression
-      case expr: RexNodeExpression => RexPlannerExpression(expr.getRexNode)
-      case _ =>
-        throw new TableException("Unrecognized expression: " + other)
-    }
-  }
-}
-
-object PlannerExpressionConverter {
-  val INSTANCE: PlannerExpressionConverter = new PlannerExpressionConverter
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
deleted file mode 100644
index 9bf173ddcf0..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionUtils.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-object PlannerExpressionUtils {
-
-  private[flink] def isTimeIntervalLiteral(expr: PlannerExpression): Boolean = 
expr match {
-    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
-    case _ => false
-  }
-
-  private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean = 
expr match {
-    case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
-    case _ => false
-  }
-
-  private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr 
match {
-    case r: PlannerResolvedFieldReference if 
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
-      true
-    case _ => false
-  }
-
-  private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: PlannerResolvedFieldReference
-        if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
-      true
-    case _ => false
-  }
-
-  private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: PlannerResolvedFieldReference
-        if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
-      true
-    case _ => false
-  }
-
-  private[flink] def toTime(expr: PlannerExpression): FlinkTime = expr match {
-    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-      FlinkTime.milliseconds(value)
-    case _ => throw new IllegalArgumentException()
-  }
-
-  private[flink] def toLong(expr: PlannerExpression): Long = expr match {
-    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
-    case _ => throw new IllegalArgumentException()
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
deleted file mode 100644
index 182198a45e0..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/TreeNode.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
-
-/** Generic base class for trees that can be transformed and traversed. */
-abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
-
-  /**
-   * List of child nodes that should be considered when doing transformations. 
Other values in the
-   * Product will not be transformed, only handed through.
-   */
-  private[flink] def children: Seq[A]
-
-  /** Tests for equality by first testing for reference equality. */
-  private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) 
|| this == other
-
-  /** Do tree transformation in post order. */
-  private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
-    def childrenTransform(rule: PartialFunction[A, A]): A = {
-      var changed = false
-      val newArgs = productIterator.map {
-        case arg: TreeNode[_] if children.contains(arg) =>
-          val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
-          if (!(newChild.fastEquals(arg))) {
-            changed = true
-            newChild
-          } else {
-            arg
-          }
-        case args: Traversable[_] =>
-          args.map {
-            case arg: TreeNode[_] if children.contains(arg) =>
-              val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
-              if (!(newChild.fastEquals(arg))) {
-                changed = true
-                newChild
-              } else {
-                arg
-              }
-            case other => other
-          }
-        case nonChild: AnyRef => nonChild
-        case null => null
-      }.toArray
-      if (changed) makeCopy(newArgs) else this
-    }
-
-    val afterChildren = childrenTransform(rule)
-    if (afterChildren.fastEquals(this)) {
-      rule.applyOrElse(this, identity[A])
-    } else {
-      rule.applyOrElse(afterChildren, identity[A])
-    }
-  }
-
-  /** Runs the given function first on the node and then recursively on all 
its children. */
-  private[flink] def preOrderVisit(f: A => Unit): Unit = {
-    f(this)
-    children.foreach(_.preOrderVisit(f))
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used 
during transformation if
-   * children change.
-   */
-  private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
-    val ctors = getClass.getConstructors.filter(_.getParameterCount > 0)
-    if (ctors.isEmpty) {
-      throw new RuntimeException(s"No valid constructor for 
${getClass.getSimpleName}")
-    }
-
-    val defaultCtor = ctors
-      .find {
-        ctor =>
-          if (ctor.getParameterCount != newArgs.length) {
-            false
-          } else if (newArgs.contains(null)) {
-            false
-          } else {
-            val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
-            TypeInfoCheckUtils.isAssignable(argsClasses, 
ctor.getParameterTypes)
-          }
-      }
-      .getOrElse(ctors.maxBy(_.getParameterCount))
-
-    try {
-      defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
-    } catch {
-      case e: Throwable =>
-        throw new RuntimeException(s"Fail to copy tree node 
${getClass.getName}.", e)
-    }
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
deleted file mode 100644
index d3c19b8c4a1..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.MultisetTypeInfo
-import org.apache.flink.table.expressions.CallExpression
-import org.apache.flink.table.functions.ImperativeAggregateFunction
-import org.apache.flink.table.planner.calcite.FlinkTypeSystem
-import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
-import org.apache.flink.table.planner.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo,
 fromTypeInfoToLogicalType}
-import org.apache.flink.table.types.utils.TypeConversions
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
-sealed abstract class Aggregation extends PlannerExpression {
-
-  override def toString = s"Aggregate"
-
-}
-
-/**
- * Wrapper for call expressions resolved already in the API with the new type 
inference stack.
- * Separate from [[ApiResolvedExpression]] because others' expressions 
validation logic check for
- * the [[Aggregation]] trait.
- */
-case class ApiResolvedAggregateCallExpression(resolvedCall: CallExpression) 
extends Aggregation {
-
-  private[flink] val children = Nil
-
-  override private[flink] def resultType: TypeInformation[_] = TypeConversions
-    .fromDataTypeToLegacyInfo(resolvedCall.getOutputDataType)
-}
-
-/** Expression for calling a user-defined (table)aggregate function. */
-case class AggFunctionCall(
-    aggregateFunction: ImperativeAggregateFunction[_, _],
-    resultTypeInfo: TypeInformation[_],
-    accTypeInfo: TypeInformation[_],
-    args: Seq[PlannerExpression])
-  extends Aggregation {
-
-  override private[flink] def children: Seq[PlannerExpression] = args
-
-  override def resultType: TypeInformation[_] = resultTypeInfo
-
-  override def validateInput(): ValidationResult = {
-    val signature = children.map(_.resultType)
-    // look for a signature that matches the input types
-    val foundSignature =
-      getAccumulateMethodSignature(aggregateFunction, 
signature.map(fromTypeInfoToLogicalType))
-    if (foundSignature.isEmpty) {
-      ValidationFailure(
-        s"Given parameters do not match any signature. \n" +
-          s"Actual: 
${signatureToString(signature.map(fromLegacyInfoToDataType))} \n" +
-          s"Expected: ${getMethodSignatures(aggregateFunction, "accumulate")
-              .map(_.drop(1))
-              .map(signatureToString)
-              .sorted // make sure order to verify error messages in tests
-              .mkString(", ")}")
-    } else {
-      ValidationSuccess
-    }
-  }
-
-  override def toString: String = 
s"${aggregateFunction.getClass.getSimpleName}($args)"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
deleted file mode 100644
index a6401076759..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, 
Types}
-import org.apache.flink.table.expressions.CallExpression
-import org.apache.flink.table.functions._
-import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.planner.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
-import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.utils.TypeConversions
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-/**
- * Wrapper for expressions that have been resolved already in the API with the 
new type inference
- * stack.
- */
-case class ApiResolvedExpression(resolvedDataType: DataType) extends 
LeafExpression {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    TypeConversions.fromDataTypeToLegacyInfo(resolvedDataType)
-}
-
-/**
- * Over call with unresolved alias for over window.
- *
- * @param agg
- *   The aggregation of the over call.
- * @param alias
- *   The alias of the referenced over window.
- */
-case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
-  extends PlannerExpression {
-
-  override private[flink] def validateInput() =
-    ValidationFailure(s"Over window with alias $alias could not be resolved.")
-
-  override private[flink] def resultType = agg.resultType
-
-  override private[flink] def children = Seq()
-}
-
-/**
- * Expression for calling a user-defined scalar functions.
- *
- * @param scalarFunction
- *   scalar function to be called (might be overloaded)
- * @param parameters
- *   actual parameters that determine target evaluation method
- */
-case class PlannerScalarFunctionCall(
-    scalarFunction: ScalarFunction,
-    parameters: Seq[PlannerExpression])
-  extends PlannerExpression {
-
-  private var signature: Array[LogicalType] = _
-
-  override private[flink] def children: Seq[PlannerExpression] = parameters
-
-  override def toString =
-    s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", 
")})"
-
-  override private[flink] def resultType =
-    fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(scalarFunction, 
signature))
-
-  override private[flink] def validateInput(): ValidationResult = {
-    signature = 
children.map(_.resultType).map(fromTypeInfoToLogicalType).toArray
-    // look for a signature that matches the input types
-    val foundSignature = getEvalMethodSignatureOption(scalarFunction, 
signature)
-    if (foundSignature.isEmpty) {
-      ValidationFailure(
-        s"Given parameters do not match any signature. \n" +
-          s"Actual: 
${signatureToString(signature.map(fromLogicalTypeToDataType))} \n" +
-          s"Expected: ${signaturesToString(scalarFunction, "eval")}")
-    } else {
-      ValidationSuccess
-    }
-  }
-}
-
-/**
- * Expression for calling a user-defined table function with actual parameters.
- *
- * @param functionName
- *   function name
- * @param tableFunction
- *   user-defined table function
- * @param parameters
- *   actual parameters of function
- * @param resultType
- *   type information of returned table
- */
-case class PlannerTableFunctionCall(
-    functionName: String,
-    tableFunction: TableFunction[_],
-    parameters: Seq[PlannerExpression],
-    resultType: TypeInformation[_])
-  extends PlannerExpression {
-
-  override private[flink] def children: Seq[PlannerExpression] = parameters
-
-  override def validateInput(): ValidationResult = {
-    // look for a signature that matches the input types
-    val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
-    val foundMethod = getUserDefinedMethod(tableFunction, "eval", signature)
-    if (foundMethod.isEmpty) {
-      ValidationFailure(
-        s"Given parameters of function '$functionName' do not match any 
signature. \n" +
-          s"Actual: ${signatureToString(signature)} \n" +
-          s"Expected: ${signaturesToString(tableFunction, "eval")}")
-    } else {
-      ValidationSuccess
-    }
-  }
-
-  override def toString =
-    s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
deleted file mode 100644
index c8aa491a6ba..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api._
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory._
-import org.apache.flink.table.planner.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-
-import org.apache.calcite.rex.RexNode
-
-trait NamedExpression extends PlannerExpression {
-  private[flink] def name: String
-  private[flink] def toAttribute: Attribute
-}
-
-abstract class Attribute extends LeafExpression with NamedExpression {
-  override private[flink] def toAttribute: Attribute = this
-
-  private[flink] def withName(newName: String): Attribute
-}
-
-/** Dummy wrapper for expressions that were converted to RexNode in a 
different way. */
-case class RexPlannerExpression(private[flink] val rexNode: RexNode) extends 
LeafExpression {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    fromLogicalTypeToTypeInfo(FlinkTypeFactory.toLogicalType(rexNode.getType))
-}
-
-case class UnresolvedFieldReference(name: String) extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute =
-    UnresolvedFieldReference(newName)
-
-  override private[flink] def resultType: TypeInformation[_] =
-    throw new UnresolvedException(s"Calling resultType on ${this.getClass}.")
-
-  override private[flink] def validateInput(): ValidationResult =
-    ValidationFailure(s"Unresolved reference $name.")
-}
-
-case class PlannerResolvedFieldReference(name: String, resultType: 
TypeInformation[_])
-  extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) {
-      this
-    } else {
-      PlannerResolvedFieldReference(newName, resultType)
-    }
-  }
-}
-
-case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = 
None) extends Attribute {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    tpe.getOrElse(throw new UnresolvedException("Could not resolve type of 
referenced window."))
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) {
-      this
-    } else {
-      throw new ValidationException("Cannot rename window reference.")
-    }
-  }
-
-  override def toString: String = s"'$name"
-}
-
-case class TableReference(name: String, tableOperation: QueryOperation)
-  extends LeafExpression
-  with NamedExpression {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    throw new UnresolvedException(s"Table reference '$name' has no result 
type.")
-
-  override private[flink] def toAttribute =
-    throw new UnsupportedOperationException(s"A table reference '$name' can 
not be an attribute.")
-
-  override def toString: String = s"$name"
-}
-
-/** Expression to access the timestamp of a StreamRecord. */
-case class StreamRecordTimestamp() extends LeafExpression {
-
-  override private[flink] def resultType = Types.LONG
-}
-
-/**
- * Special reference which represent a local field, such as aggregate buffers 
or constants. We are
- * stored as class members, so the field can be referenced directly. We should 
use an unique name to
- * locate the field.
- */
-case class PlannerLocalReference(name: String, resultType: TypeInformation[_]) 
extends Attribute {
-
-  override def toString = s"'$name"
-
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) this
-    else PlannerLocalReference(newName, resultType)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
deleted file mode 100644
index b1dfacba354..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/literals.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, 
SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime => jLocalTime}
-import java.util.{Calendar, TimeZone}
-
-object Literal {
-  private[flink] val UTC = TimeZone.getTimeZone("UTC")
-
-  private[flink] def apply(l: Any): Literal = l match {
-    case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
-    case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
-    case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
-    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
-    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
-    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
-    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
-    case javaDec: java.math.BigDecimal => Literal(javaDec, 
BasicTypeInfo.BIG_DEC_TYPE_INFO)
-    case scalaDec: scala.math.BigDecimal =>
-      Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
-    case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
-    case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
-    case sqlTimestamp: Timestamp => Literal(sqlTimestamp, 
SqlTimeTypeInfo.TIMESTAMP)
-    case localDate: LocalDate => Literal(localDate, 
LocalTimeTypeInfo.LOCAL_DATE)
-    case localTime: jLocalTime => Literal(localTime, 
LocalTimeTypeInfo.LOCAL_TIME)
-    case localDateTime: LocalDateTime => Literal(localDateTime, 
LocalTimeTypeInfo.LOCAL_DATE_TIME)
-  }
-}
-
-case class Literal(value: Any, resultType: TypeInformation[_]) extends 
LeafExpression {
-  override def toString: String = resultType match {
-    case _: BasicTypeInfo[_] => value.toString
-    case _ @SqlTimeTypeInfo.DATE => value.toString + ".toDate"
-    case _ @SqlTimeTypeInfo.TIME => value.toString + ".toTime"
-    case _ @SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
-    case _ @TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
-    case _ @TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
-    case _ => s"Literal($value, $resultType)"
-  }
-
-  /**
-   * Convert a Date value to a Calendar. Calcite's fromCalendarField functions 
use the Calendar.get
-   * methods, so the raw values of the individual fields are preserved when 
converted to the String
-   * formats.
-   *
-   * @return
-   *   get the Calendar value
-   */
-  private def valueAsCalendar: Calendar = {
-    val date = value.asInstanceOf[java.util.Date]
-    val cal = Calendar.getInstance
-    cal.setTime(date)
-    cal
-  }
-}
-
-@deprecated(
-  "Use nullOf(TypeInformation) instead. It is available through the implicit 
Scala DSL.",
-  "1.8.0")
-case class Null(resultType: TypeInformation[_]) extends LeafExpression {
-  override def toString = s"null"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
deleted file mode 100644
index 2147aaa0c71..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/symbols.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.utils.DateTimeUtils.{TimeUnit, TimeUnitRange}
-
-import org.apache.calcite.sql.fun.SqlTrimFunction
-
-import scala.language.{existentials, implicitConversions}
-
-/** General expression class to represent a symbol. */
-case class SymbolPlannerExpression(symbol: PlannerSymbol) extends 
LeafExpression {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    throw new UnsupportedOperationException("This should not happen. A symbol 
has no result type.")
-
-  def toExpr: SymbolPlannerExpression = this // triggers implicit conversion
-
-  override def toString: String = s"${symbol.symbols}.${symbol.name}"
-
-}
-
-/** Symbol that wraps a Calcite symbol in form of a Java enum. */
-trait PlannerSymbol {
-  def symbols: PlannerSymbols
-  def name: String
-  def enum: Enum[_]
-}
-
-/** Enumeration of symbols. */
-abstract class PlannerSymbols extends Enumeration {
-
-  class PlannerSymbolValue(e: Enum[_]) extends Val(e.name()) with 
PlannerSymbol {
-    override def symbols: PlannerSymbols = PlannerSymbols.this
-
-    override def enum: Enum[_] = e
-
-    override def name: String = toString()
-  }
-
-  final protected def Value(enum: Enum[_]): PlannerSymbolValue = new 
PlannerSymbolValue(enum)
-
-  implicit def symbolToExpression(symbol: PlannerSymbolValue): 
SymbolPlannerExpression =
-    SymbolPlannerExpression(symbol)
-
-}
-
-/** Units for working with time intervals. */
-object PlannerTimeIntervalUnit extends PlannerSymbols {
-
-  type PlannerTimeIntervalUnit = PlannerSymbolValue
-
-  val MILLENNIUM = Value(TimeUnitRange.MILLENNIUM)
-  val CENTURY = Value(TimeUnitRange.CENTURY)
-  val DECADE = Value(TimeUnitRange.DECADE)
-  val YEAR = Value(TimeUnitRange.YEAR)
-  val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
-  val QUARTER = Value(TimeUnitRange.QUARTER)
-  val MONTH = Value(TimeUnitRange.MONTH)
-  val WEEK = Value(TimeUnitRange.WEEK)
-  val DAY = Value(TimeUnitRange.DAY)
-  val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
-  val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
-  val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
-  val HOUR = Value(TimeUnitRange.HOUR)
-  val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
-  val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
-  val MINUTE = Value(TimeUnitRange.MINUTE)
-  val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
-  val SECOND = Value(TimeUnitRange.SECOND)
-  val MILLISECOND = Value(TimeUnitRange.MILLISECOND)
-  val MICROSECOND = Value(TimeUnitRange.MICROSECOND)
-  val NANOSECOND = Value(TimeUnitRange.NANOSECOND)
-  val EPOCH = Value(TimeUnitRange.EPOCH)
-
-}
-
-/** Units for working with time points. */
-object PlannerTimePointUnit extends PlannerSymbols {
-
-  type PlannerTimePointUnit = PlannerSymbolValue
-
-  val YEAR = Value(TimeUnit.YEAR)
-  val MONTH = Value(TimeUnit.MONTH)
-  val DAY = Value(TimeUnit.DAY)
-  val HOUR = Value(TimeUnit.HOUR)
-  val MINUTE = Value(TimeUnit.MINUTE)
-  val SECOND = Value(TimeUnit.SECOND)
-  val QUARTER = Value(TimeUnit.QUARTER)
-  val WEEK = Value(TimeUnit.WEEK)
-  val MILLISECOND = Value(TimeUnit.MILLISECOND)
-  val MICROSECOND = Value(TimeUnit.MICROSECOND)
-
-}
-
-/** Modes for trimming strings. */
-object PlannerTrimMode extends PlannerSymbols {
-
-  type PlannerTrimMode = PlannerSymbolValue
-
-  val BOTH = Value(SqlTrimFunction.Flag.BOTH)
-  val LEADING = Value(SqlTrimFunction.Flag.LEADING)
-  val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
-
-}

Reply via email to