This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ebf50c4  [FLINK-22378][table] Derive type of SOURCE_WATERMARK() from 
time attribute
ebf50c4 is described below

commit ebf50c48ccc5e93d58a21e825a8542165a3fc129
Author: Timo Walther <[email protected]>
AuthorDate: Thu Apr 22 17:28:38 2021 +0200

    [FLINK-22378][table] Derive type of SOURCE_WATERMARK() from time attribute
    
    This closes #15730.
---
 .../table/planner/delegation/hive/HiveParser.java  |  6 +-
 .../planner/delegation/hive/HiveParserFactory.java |  6 --
 .../table/api/internal/TableEnvironmentImpl.java   |  6 +-
 .../flink/table/catalog/DefaultSchemaResolver.java | 25 +++++---
 .../org/apache/flink/table/delegation/Parser.java  | 11 +++-
 .../expressions/resolver/ExpressionResolver.java   | 18 ++++++
 .../resolver/SqlExpressionResolver.java            |  8 ++-
 .../resolver/rules/ResolveSqlCallRule.java         | 45 +++++++++++---
 .../expressions/resolver/rules/ResolverRule.java   |  4 ++
 .../catalog/CatalogBaseTableResolutionTest.java    |  6 +-
 .../flink/table/catalog/SchemaResolutionTest.java  | 30 +++++----
 .../resolver/ExpressionResolverTest.java           |  2 +-
 .../utils/ValuesOperationTreeBuilderTest.java      |  2 +-
 .../flink/table/utils/ExpressionResolverMocks.java |  2 +-
 .../org/apache/flink/table/utils/ParserMock.java   |  8 ++-
 .../table/connector/sink/DynamicTableSink.java     |  7 +--
 .../functions/BuiltInFunctionDefinitions.java      |  2 +-
 .../table/types/inference/TypeStrategies.java      | 16 +++++
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 17 ++++++
 .../planner/calcite/SqlExprToRexConverter.java     |  3 +
 .../calcite/SqlExprToRexConverterFactory.java      | 19 +++++-
 .../planner/calcite/SqlExprToRexConverterImpl.java | 19 ++++--
 .../planner/delegation/DefaultParserFactory.java   |  7 +--
 .../flink/table/planner/delegation/ParserImpl.java | 17 +++---
 .../table/planner/delegation/PlannerContext.java   | 55 +++++++++++++----
 .../expressions/CallExpressionResolver.java        |  2 +-
 .../inference/TypeInferenceReturnInference.java    | 20 +++++-
 .../plan/abilities/source/FilterPushDownSpec.java  |  2 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   | 38 +++++++-----
 .../table/planner/delegation/PlannerBase.scala     |  7 +--
 .../plan/schema/LegacyCatalogSourceTable.scala     |  4 +-
 .../table/planner/delegation/ParserImplTest.java   |  4 +-
 .../operations/SqlToOperationConverterTest.java    | 18 +++---
 .../runtime/stream/sql/DataStreamJavaITCase.java   | 71 ++++++++++++++++++++--
 .../flink/table/planner/utils/PlannerMocks.java    |  4 +-
 .../codegen/WatermarkGeneratorCodeGenTest.scala    | 26 +-------
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  2 +-
 .../org/apache/flink/table/planner/ParserImpl.java |  8 ++-
 .../flink/table/api/internal/TableEnvImpl.scala    |  6 +-
 .../operators/sink/OutputConversionOperator.java   |  2 +
 40 files changed, 390 insertions(+), 165 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index b231390..b2218c9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.delegation.hive;
 
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.table.api.SqlParserException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -36,7 +35,6 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableASOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
 import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 import 
org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseException;
@@ -80,7 +78,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 /** A Parser that uses Hive's planner to parse a statement. */
@@ -176,13 +173,12 @@ public class HiveParser extends ParserImpl {
             CatalogManager catalogManager,
             Supplier<FlinkPlannerImpl> validatorSupplier,
             Supplier<CalciteParser> calciteParserSupplier,
-            Function<TableSchema, SqlExprToRexConverter> 
sqlExprToRexConverterCreator,
             PlannerContext plannerContext) {
         super(
                 catalogManager,
                 validatorSupplier,
                 calciteParserSupplier,
-                sqlExprToRexConverterCreator);
+                plannerContext.getSqlExprToRexConverterFactory());
         this.plannerContext = plannerContext;
         this.catalogReader =
                 plannerContext.createCatalogReader(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
index b23603e..dab4c5e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.delegation.ParserFactory;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 
@@ -36,8 +35,6 @@ public class HiveParserFactory implements ParserFactory {
 
     @Override
     public Parser create(CatalogManager catalogManager, PlannerContext 
plannerContext) {
-        SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
-                plannerContext::createSqlExprToRexConverter;
         return new HiveParser(
                 catalogManager,
                 () ->
@@ -45,9 +42,6 @@ public class HiveParserFactory implements ParserFactory {
                                 catalogManager.getCurrentCatalog(),
                                 catalogManager.getCurrentDatabase()),
                 plannerContext::createCalciteParser,
-                tableSchema ->
-                        sqlExprToRexConverterFactory.create(
-                                
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
                 plannerContext);
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9bb3530..bb6dd58 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -255,9 +255,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                 return Optional.empty();
                             }
                         },
-                        (sqlExpression, inputSchema) -> {
+                        (sqlExpression, inputRowType, outputType) -> {
                             try {
-                                return 
getParser().parseSqlExpression(sqlExpression, inputSchema);
+                                return getParser()
+                                        .parseSqlExpression(
+                                                sqlExpression, inputRowType, 
outputType);
                             } catch (Throwable t) {
                                 throw new ValidationException(
                                         String.format("Invalid SQL expression: 
%s", sqlExpression),
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
index 5b008cc..0993669 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -144,7 +144,8 @@ class DefaultSchemaResolver implements SchemaResolver {
             UnresolvedComputedColumn unresolvedColumn, List<Column> 
inputColumns) {
         final ResolvedExpression resolvedExpression;
         try {
-            resolvedExpression = resolveExpression(inputColumns, 
unresolvedColumn.getExpression());
+            resolvedExpression =
+                    resolveExpression(inputColumns, 
unresolvedColumn.getExpression(), null);
         } catch (Exception e) {
             throw new ValidationException(
                     String.format(
@@ -189,22 +190,26 @@ class DefaultSchemaResolver implements SchemaResolver {
         final ResolvedExpression watermarkExpression;
         try {
             watermarkExpression =
-                    resolveExpression(inputColumns, 
watermarkSpec.getWatermarkExpression());
+                    resolveExpression(
+                            inputColumns,
+                            watermarkSpec.getWatermarkExpression(),
+                            validatedTimeColumn.getDataType());
         } catch (Exception e) {
             throw new ValidationException(
                     String.format(
                             "Invalid expression for watermark '%s'.", 
watermarkSpec.toString()),
                     e);
         }
-        
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
+        final LogicalType outputType = 
watermarkExpression.getOutputDataType().getLogicalType();
+        final LogicalType timeColumnType = 
validatedTimeColumn.getDataType().getLogicalType();
+        validateWatermarkExpression(outputType);
 
-        if 
(!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
-                == 
validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
+        if (outputType.getTypeRoot() != timeColumnType.getTypeRoot()) {
             throw new ValidationException(
                     String.format(
-                            "The watermark output type %s is different from 
input time filed type %s.",
-                            watermarkExpression.getOutputDataType(),
-                            validatedTimeColumn.getDataType()));
+                            "The watermark declaration's output data type '%s' 
is different "
+                                    + "from the time field's data type '%s'.",
+                            outputType, timeColumnType));
         }
 
         return Collections.singletonList(
@@ -348,13 +353,15 @@ class DefaultSchemaResolver implements SchemaResolver {
         }
     }
 
-    private ResolvedExpression resolveExpression(List<Column> columns, 
Expression expression) {
+    private ResolvedExpression resolveExpression(
+            List<Column> columns, Expression expression, @Nullable DataType 
outputDataType) {
         final LocalReferenceExpression[] localRefs =
                 columns.stream()
                         .map(c -> localRef(c.getName(), c.getDataType()))
                         .toArray(LocalReferenceExpression[]::new);
         return resolverBuilder
                 .withLocalReferences(localRefs)
+                .withOutputDataType(outputDataType)
                 .build()
                 .resolve(Collections.singletonList(expression))
                 .get(0);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
index 20f07e1..0ff4567 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
@@ -19,11 +19,14 @@
 package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
 
 import java.util.List;
 
@@ -58,11 +61,13 @@ public interface Parser {
      * Entry point for parsing SQL expressions expressed as a String.
      *
      * @param sqlExpression the SQL expression to parse
-     * @param inputSchema the schema of the fields in sql expression
+     * @param inputRowType the fields available in the SQL expression
+     * @param outputType expected top-level output type if available
      * @return resolved expression
      * @throws org.apache.flink.table.api.SqlParserException when failed to 
parse the sql expression
      */
-    ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema 
inputSchema);
+    ResolvedExpression parseSqlExpression(
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType);
 
     /**
      * Returns completion hints for the given statement at the given cursor 
position. The completion
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 3c58f0f..fce7309 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -43,6 +43,8 @@ import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -117,6 +119,8 @@ public class ExpressionResolver {
 
     private final Map<String, LocalReferenceExpression> localReferences;
 
+    private final @Nullable DataType outputDataType;
+
     private final Map<Expression, LocalOverWindow> localOverWindows;
 
     private final boolean isGroupedAggregation;
@@ -130,6 +134,7 @@ public class ExpressionResolver {
             FieldReferenceLookup fieldLookup,
             List<OverWindow> localOverWindows,
             List<LocalReferenceExpression> localReferences,
+            @Nullable DataType outputDataType,
             boolean isGroupedAggregation) {
         this.config = Preconditions.checkNotNull(config).getConfiguration();
         this.tableLookup = Preconditions.checkNotNull(tableLookup);
@@ -149,6 +154,7 @@ public class ExpressionResolver {
                                                     "Duplicate local 
reference: " + u);
                                         },
                                         LinkedHashMap::new));
+        this.outputDataType = outputDataType;
         this.localOverWindows = prepareOverWindows(localOverWindows);
         this.isGroupedAggregation = isGroupedAggregation;
     }
@@ -324,6 +330,11 @@ public class ExpressionResolver {
         }
 
         @Override
+        public Optional<DataType> getOutputDataType() {
+            return Optional.ofNullable(outputDataType);
+        }
+
+        @Override
         public Optional<LocalOverWindow> getOverWindow(Expression alias) {
             return Optional.ofNullable(localOverWindows.get(alias));
         }
@@ -443,6 +454,7 @@ public class ExpressionResolver {
         private final SqlExpressionResolver sqlExpressionResolver;
         private List<OverWindow> logicalOverWindows = new ArrayList<>();
         private List<LocalReferenceExpression> localReferences = new 
ArrayList<>();
+        private @Nullable DataType outputDataType;
         private boolean isGroupedAggregation;
 
         private ExpressionResolverBuilder(
@@ -471,6 +483,11 @@ public class ExpressionResolver {
             return this;
         }
 
+        public ExpressionResolverBuilder withOutputDataType(@Nullable DataType 
outputDataType) {
+            this.outputDataType = outputDataType;
+            return this;
+        }
+
         public ExpressionResolverBuilder withGroupedAggregation(boolean 
isGroupedAggregation) {
             this.isGroupedAggregation = isGroupedAggregation;
             return this;
@@ -486,6 +503,7 @@ public class ExpressionResolver {
                     new FieldReferenceLookup(queryOperations),
                     logicalOverWindows,
                     localReferences,
+                    outputDataType,
                     isGroupedAggregation);
         }
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
index fd1bd7a..516837d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
@@ -19,14 +19,18 @@
 package org.apache.flink.table.expressions.resolver;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
 
 /** Translates a SQL expression string into a {@link ResolvedExpression}. */
 @Internal
 public interface SqlExpressionResolver {
 
     /** Translates the given SQL expression string or throws a {@link 
ValidationException}. */
-    ResolvedExpression resolveExpression(String sqlExpression, TableSchema 
inputSchema);
+    ResolvedExpression resolveExpression(
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
index cf2d74b..fb8de72 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -19,13 +19,19 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,32 +43,49 @@ final class ResolveSqlCallRule implements ResolverRule {
 
     @Override
     public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
-        return expression.stream()
-                .map(expr -> expr.accept(new 
TranslateSqlCallsVisitor(context)))
-                .collect(Collectors.toList());
+        // only the top-level expressions may access the output data type
+        final LogicalType outputType =
+                
context.getOutputDataType().map(DataType::getLogicalType).orElse(null);
+        final TranslateSqlCallsVisitor visitor = new 
TranslateSqlCallsVisitor(context, outputType);
+        return expression.stream().map(expr -> 
expr.accept(visitor)).collect(Collectors.toList());
     }
 
     private static class TranslateSqlCallsVisitor extends 
RuleExpressionVisitor<Expression> {
 
-        TranslateSqlCallsVisitor(ResolutionContext resolutionContext) {
+        private final @Nullable LogicalType outputType;
+
+        TranslateSqlCallsVisitor(
+                ResolutionContext resolutionContext, @Nullable LogicalType 
outputType) {
             super(resolutionContext);
+            this.outputType = outputType;
         }
 
         @Override
         public Expression visit(SqlCallExpression sqlCall) {
             final SqlExpressionResolver resolver = 
resolutionContext.sqlExpressionResolver();
 
-            final TableSchema.Builder builder = TableSchema.builder();
+            final List<RowField> fields = new ArrayList<>();
             // input references
             resolutionContext
                     .referenceLookup()
                     .getAllInputFields()
-                    .forEach(f -> builder.field(f.getName(), 
f.getOutputDataType()));
+                    .forEach(
+                            f ->
+                                    fields.add(
+                                            new RowField(
+                                                    f.getName(),
+                                                    
f.getOutputDataType().getLogicalType())));
             // local references
             resolutionContext
                     .getLocalReferences()
-                    .forEach(refs -> builder.field(refs.getName(), 
refs.getOutputDataType()));
-            return resolver.resolveExpression(sqlCall.getSqlExpression(), 
builder.build());
+                    .forEach(
+                            refs ->
+                                    fields.add(
+                                            new RowField(
+                                                    refs.getName(),
+                                                    
refs.getOutputDataType().getLogicalType())));
+            return resolver.resolveExpression(
+                    sqlCall.getSqlExpression(), new RowType(false, fields), 
outputType);
         }
 
         @Override
@@ -76,8 +99,10 @@ final class ResolveSqlCallRule implements ResolverRule {
         }
 
         private List<Expression> resolveChildren(List<Expression> 
lookupChildren) {
+            final TranslateSqlCallsVisitor visitor =
+                    new TranslateSqlCallsVisitor(resolutionContext, null);
             return lookupChildren.stream()
-                    .map(child -> child.accept(this))
+                    .map(child -> child.accept(visitor))
                     .collect(Collectors.toList());
         }
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index bc212ce..0ee8c5e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
 import 
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup;
 import 
org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
 
 import java.util.List;
 import java.util.Optional;
@@ -87,6 +88,9 @@ public interface ResolverRule {
         /** Access to available local references. */
         List<LocalReferenceExpression> getLocalReferences();
 
+        /** Access to the expected top-level output data type. */
+        Optional<DataType> getOutputDataType();
+
         /** Access to available local over windows. */
         Optional<LocalOverWindow> getOverWindow(Expression alias);
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index cef78b3..5085ad7 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -26,11 +26,15 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import 
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -268,7 +272,7 @@ public class CatalogBaseTableResolutionTest {
     }
 
     private static ResolvedExpression resolveSqlExpression(
-            String sqlExpression, TableSchema inputSchema) {
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType) {
         switch (sqlExpression) {
             case COMPUTED_SQL:
                 return COMPUTED_COLUMN_RESOLVED;
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index aeb5bda..6e0edd6 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -21,18 +21,20 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -217,7 +219,8 @@ public class SchemaResolutionTest {
                         .column("ts", DataTypes.TIMESTAMP(3))
                         .watermark("ts", callSql(INVALID_WATERMARK_SQL))
                         .build(),
-                "The watermark output type TIMESTAMP_LTZ(3) is different from 
input time filed type TIMESTAMP(3).");
+                "The watermark declaration's output data type 
'TIMESTAMP_LTZ(3)' is "
+                        + "different from the time field's data type 
'TIMESTAMP(3)'.");
 
         testError(
                 Schema.newBuilder()
@@ -440,27 +443,27 @@ public class SchemaResolutionTest {
     }
 
     private static ResolvedExpression resolveSqlExpression(
-            String sqlExpression, TableSchema inputSchema) {
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType) {
         switch (sqlExpression) {
             case COMPUTED_SQL:
                 assertThat(
-                        inputSchema.getFieldDataType("orig_ts").orElse(null),
-                        equalTo(DataTypes.TIMESTAMP(3)));
+                        getType(inputRowType, "orig_ts"),
+                        equalTo(DataTypes.TIMESTAMP(3).getLogicalType()));
                 return COMPUTED_COLUMN_RESOLVED;
             case COMPUTED_SQL_WITH_TS_LTZ:
                 assertThat(
-                        inputSchema.getFieldDataType("ts_ltz").orElse(null),
-                        equalTo(DataTypes.TIMESTAMP_LTZ(3)));
+                        getType(inputRowType, "ts_ltz"),
+                        equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType()));
                 return COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ;
             case WATERMARK_SQL:
                 assertThat(
-                        inputSchema.getFieldDataType("ts").orElse(null),
-                        equalTo(DataTypes.TIMESTAMP(3)));
+                        getType(inputRowType, "ts"),
+                        equalTo(DataTypes.TIMESTAMP(3).getLogicalType()));
                 return WATERMARK_RESOLVED;
             case WATERMARK_SQL_WITH_TS_LTZ:
                 assertThat(
-                        inputSchema.getFieldDataType("ts1").orElse(null),
-                        equalTo(DataTypes.TIMESTAMP_LTZ(3)));
+                        getType(inputRowType, "ts1"),
+                        equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType()));
                 return WATERMARK_RESOLVED_WITH_TS_LTZ;
             case PROCTIME_SQL:
                 return PROCTIME_RESOLVED;
@@ -478,4 +481,9 @@ public class SchemaResolutionTest {
                 .getDataType()
                 .getLogicalType();
     }
+
+    private static LogicalType getType(RowType inputRowType, String field) {
+        final int pos = inputRowType.getFieldIndex(field);
+        return inputRowType.getTypeAt(pos);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index f48033d..b89d09b 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -332,7 +332,7 @@ public class ExpressionResolverTest {
                             name -> Optional.empty(),
                             new FunctionLookupMock(functions),
                             new DataTypeFactoryMock(),
-                            (sqlExpression, inputSchema) -> {
+                            (sqlExpression, inputRowType, outputType) -> {
                                 throw new UnsupportedOperationException();
                             },
                             Arrays.stream(schemas)
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
index a18c1be..d011b9c 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
@@ -496,7 +496,7 @@ public class ValuesOperationTreeBuilderTest {
                     new FunctionLookupMock(Collections.emptyMap()),
                     new DataTypeFactoryMock(),
                     name -> Optional.empty(), // do not support
-                    (sqlExpression, inputSchema) -> {
+                    (sqlExpression, inputRowType, outputType) -> {
                         throw new UnsupportedOperationException();
                     },
                     true);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
index 4deb785..68970fe 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
@@ -47,7 +47,7 @@ public final class ExpressionResolverMocks {
 
     public static ExpressionResolverBuilder dummyResolver() {
         return forSqlExpression(
-                (sqlExpression, inputSchema) -> {
+                (sqlExpression, inputRowType, outputType) -> {
                     throw new UnsupportedOperationException();
                 });
     }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
index c56ee9c..526c682 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.utils;
 
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
 
 import java.util.List;
 
@@ -39,7 +42,8 @@ public class ParserMock implements Parser {
     }
 
     @Override
-    public ResolvedExpression parseSqlExpression(String sqlExpression, 
TableSchema inputSchema) {
+    public ResolvedExpression parseSqlExpression(
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType) {
         return null;
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 74f94f8..ad58228 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.connector.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.connector.RuntimeConverter;
@@ -96,8 +96,7 @@ public interface DynamicTableSink {
      * interfaces might be located in other Flink modules.
      *
      * <p>Independent of the provider interface, the table runtime expects 
that a sink
-     * implementation accepts internal data structures (see {@link
-     * org.apache.flink.table.data.RowData} for more information).
+     * implementation accepts internal data structures (see {@link RowData} 
for more information).
      *
      * <p>The given {@link Context} offers utilities by the planner for 
creating runtime
      * implementation with minimal dependencies to internal data structures.
@@ -146,7 +145,7 @@ public interface DynamicTableSink {
          * Creates type information describing the internal data structures of 
the given {@link
          * DataType}.
          *
-         * @see TableSchema#toPhysicalRowDataType()
+         * @see ResolvedSchema#toPhysicalRowDataType()
          */
         <T> TypeInformation<T> createTypeInformation(DataType 
consumedDataType);
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index ce4f359..7d626a2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -117,7 +117,7 @@ public final class BuiltInFunctionDefinitions {
                     .name("SOURCE_WATERMARK")
                     .kind(SCALAR)
                     .inputTypeStrategy(NO_ARGS)
-                    .outputTypeStrategy(explicit(DataTypes.TIMESTAMP(3)))
+                    .outputTypeStrategy(TypeStrategies.SOURCE_WATERMARK)
                     .runtimeClass(
                             
"org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction")
                     .build();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
index 1815ed1..d65b151 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
@@ -69,6 +69,7 @@ public final class TypeStrategies {
     /** Placeholder for a missing type strategy. */
     public static final TypeStrategy MISSING = new MissingTypeStrategy();
 
+    /** Type strategy that returns a common, least restrictive type of all 
arguments. */
     public static final TypeStrategy COMMON = new CommonTypeStrategy();
 
     /** Type strategy that returns a fixed {@link DataType}. */
@@ -418,6 +419,21 @@ public final class TypeStrategies {
                 return Optional.of(nullReplacementDataType);
             };
 
+    /** Type strategy specific for source watermarks that depend on the output 
type. */
+    public static final TypeStrategy SOURCE_WATERMARK =
+            callContext -> {
+                final DataType timestampDataType =
+                        callContext
+                                .getOutputDataType()
+                                .filter(
+                                        dt ->
+                                                hasFamily(
+                                                        dt.getLogicalType(),
+                                                        
LogicalTypeFamily.TIMESTAMP))
+                                .orElse(DataTypes.TIMESTAMP_LTZ(3));
+                return Optional.of(timestampDataType);
+            };
+
     /**
      * Type strategy specific for aggregations that partially produce 
different nullability
      * depending whether the result is grouped or not.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 6ca4202..bd5a4a8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -42,6 +42,7 @@ import org.apache.calcite.util.Static;
 
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 
@@ -49,6 +50,10 @@ import static 
org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 @Internal
 public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
 
+    // Enables CallContext#getOutputDataType() when validating SQL expressions.
+    private SqlNode sqlNodeForExpectedOutputType;
+    private RelDataType expectedOutputType;
+
     public FlinkCalciteSqlValidator(
             SqlOperatorTable opTab,
             SqlValidatorCatalogReader catalogReader,
@@ -57,6 +62,18 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
         super(opTab, catalogReader, typeFactory, config);
     }
 
+    public void setExpectedOutputType(SqlNode sqlNode, RelDataType 
expectedOutputType) {
+        this.sqlNodeForExpectedOutputType = sqlNode;
+        this.expectedOutputType = expectedOutputType;
+    }
+
+    public Optional<RelDataType> getExpectedOutputType(SqlNode sqlNode) {
+        if (sqlNode == sqlNodeForExpectedOutputType) {
+            return Optional.of(expectedOutputType);
+        }
+        return Optional.empty();
+    }
+
     @Override
     public void validateLiteral(SqlLiteral literal) {
         if (literal.getTypeName() == DECIMAL) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
index 473d4d4..49294c11 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.table.planner.calcite;
 
+import org.apache.flink.annotation.Internal;
+
 import org.apache.calcite.rex.RexNode;
 
 /** Converts SQL expressions to {@link RexNode}. */
+@Internal
 public interface SqlExprToRexConverter {
 
     /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
index e1a8cc9..83ed640 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
@@ -18,13 +18,28 @@
 
 package org.apache.flink.table.planner.calcite;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import javax.annotation.Nullable;
 
 /** Factory to create {@link SqlExprToRexConverter}. */
+@Internal
 public interface SqlExprToRexConverterFactory {
 
     /**
-     * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL 
expression to RexNode.
+     * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL 
expression to {@link
+     * RexNode}.
+     */
+    SqlExprToRexConverter create(RelDataType inputRowType, @Nullable 
RelDataType outputType);
+
+    /**
+     * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL 
expression to {@link
+     * RexNode}.
      */
-    SqlExprToRexConverter create(RelDataType tableRowType);
+    SqlExprToRexConverter create(RowType inputRowType, @Nullable LogicalType 
outputType);
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
index c0fa221..8bc2d00 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
@@ -31,6 +31,8 @@ import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.FrameworkConfig;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.Properties;
 import java.util.stream.Stream;
@@ -42,14 +44,17 @@ public class SqlExprToRexConverterImpl implements 
SqlExprToRexConverter {
 
     private final SqlDialect sqlDialect;
 
-    private final RelDataType tableRowType;
+    private final RelDataType inputRowType;
+
+    private final @Nullable RelDataType outputType;
 
     public SqlExprToRexConverterImpl(
             FrameworkConfig config,
             FlinkTypeFactory typeFactory,
             RelOptCluster cluster,
             SqlDialect sqlDialect,
-            RelDataType tableRowType) {
+            RelDataType inputRowType,
+            @Nullable RelDataType outputType) {
         this.planner =
                 new FlinkPlannerImpl(
                         config,
@@ -57,20 +62,22 @@ public class SqlExprToRexConverterImpl implements 
SqlExprToRexConverter {
                         typeFactory,
                         cluster);
         this.sqlDialect = sqlDialect;
-        this.tableRowType = tableRowType;
+        this.inputRowType = inputRowType;
+        this.outputType = outputType;
     }
 
     @Override
     public String expand(String expr) {
         final CalciteParser parser = planner.parser();
         final SqlNode node = parser.parseExpression(expr);
-        final SqlNode validated = planner.validateExpression(node, 
tableRowType);
+        final SqlNode validated = planner.validateExpression(node, 
inputRowType, outputType);
         return validated.toSqlString(sqlDialect).getSql();
     }
 
     @Override
     public RexNode convertToRexNode(String expr) {
-        return convertToRexNodes(new String[] {expr})[0];
+        final CalciteParser parser = planner.parser();
+        return planner.rex(parser.parseExpression(expr), inputRowType, 
outputType);
     }
 
     @Override
@@ -78,7 +85,7 @@ public class SqlExprToRexConverterImpl implements 
SqlExprToRexConverter {
         final CalciteParser parser = planner.parser();
         return Stream.of(exprs)
                 .map(parser::parseExpression)
-                .map(node -> planner.rex(node, tableRowType))
+                .map(node -> planner.rex(node, inputRowType, null))
                 .toArray(RexNode[]::new);
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
index af968a0..99631f1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -33,8 +32,6 @@ import java.util.Map;
 public class DefaultParserFactory implements ParserFactory {
     @Override
     public Parser create(CatalogManager catalogManager, PlannerContext 
plannerContext) {
-        SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
-                plannerContext::createSqlExprToRexConverter;
         return new ParserImpl(
                 catalogManager,
                 () ->
@@ -42,9 +39,7 @@ public class DefaultParserFactory implements ParserFactory {
                                 catalogManager.getCurrentCatalog(),
                                 catalogManager.getCurrentDatabase()),
                 plannerContext::createCalciteParser,
-                tableSchema ->
-                        sqlExprToRexConverterFactory.create(
-                                
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
+                plannerContext.getSqlExprToRexConverterFactory());
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index e9136a5..e4f7d53 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.delegation;
 
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.delegation.Parser;
@@ -28,11 +27,13 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
 import org.apache.flink.table.planner.operations.SqlToOperationConverter;
 import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.parse.ExtendedParser;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.rex.RexNode;
@@ -41,12 +42,13 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.advise.SqlAdvisor;
 import org.apache.calcite.sql.advise.SqlAdvisorValidator;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -60,18 +62,18 @@ public class ParserImpl implements Parser {
     // multiple statements parsing
     private final Supplier<FlinkPlannerImpl> validatorSupplier;
     private final Supplier<CalciteParser> calciteParserSupplier;
-    private final Function<TableSchema, SqlExprToRexConverter> 
sqlExprToRexConverterCreator;
+    private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory;
     private static final ExtendedParser EXTENDED_PARSER = 
ExtendedParser.INSTANCE;
 
     public ParserImpl(
             CatalogManager catalogManager,
             Supplier<FlinkPlannerImpl> validatorSupplier,
             Supplier<CalciteParser> calciteParserSupplier,
-            Function<TableSchema, SqlExprToRexConverter> 
sqlExprToRexConverterCreator) {
+            SqlExprToRexConverterFactory sqlExprToRexConverterFactory) {
         this.catalogManager = catalogManager;
         this.validatorSupplier = validatorSupplier;
         this.calciteParserSupplier = calciteParserSupplier;
-        this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator;
+        this.sqlExprToRexConverterFactory = sqlExprToRexConverterFactory;
     }
 
     /**
@@ -109,9 +111,10 @@ public class ParserImpl implements Parser {
     }
 
     @Override
-    public ResolvedExpression parseSqlExpression(String sqlExpression, 
TableSchema inputSchema) {
+    public ResolvedExpression parseSqlExpression(
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType) {
         final SqlExprToRexConverter sqlExprToRexConverter =
-                sqlExprToRexConverterCreator.apply(inputSchema);
+                sqlExprToRexConverterFactory.create(inputRowType, outputType);
         final RexNode rexNode = 
sqlExprToRexConverter.convertToRexNode(sqlExpression);
         final LogicalType logicalType = 
FlinkTypeFactory.toLogicalType(rexNode.getType());
         // expand expression for serializable expression strings similar to 
views
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 2d856d9..f9c9397 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.planner.calcite.FlinkRexBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
 import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl;
 import org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable;
 import org.apache.flink.table.planner.codegen.ExpressionReducer;
@@ -47,6 +48,8 @@ import 
org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -69,6 +72,8 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 import static java.util.Arrays.asList;
@@ -87,6 +92,8 @@ public class PlannerContext {
 
     private final RelDataTypeSystem typeSystem = new FlinkTypeSystem();
     private final FlinkTypeFactory typeFactory = new 
FlinkTypeFactory(typeSystem);
+    private final SqlExprToRexConverterFactory rexConverterFactory =
+            new DefaultSqlExprToRexConverterFactory();
     private final TableConfig tableConfig;
     private final RelOptCluster cluster;
     private final FlinkContext context;
@@ -104,10 +111,7 @@ public class PlannerContext {
 
         this.context =
                 new FlinkContextImpl(
-                        tableConfig,
-                        functionCatalog,
-                        catalogManager,
-                        this::createSqlExprToRexConverter);
+                        tableConfig, functionCatalog, catalogManager, 
rexConverterFactory);
 
         this.rootSchema = rootSchema;
         this.traitDefs = traitDefs;
@@ -125,13 +129,8 @@ public class PlannerContext {
         this.cluster = FlinkRelOptClusterFactory.create(planner, new 
FlinkRexBuilder(typeFactory));
     }
 
-    public SqlExprToRexConverter createSqlExprToRexConverter(RelDataType 
rowType) {
-        return new SqlExprToRexConverterImpl(
-                checkNotNull(frameworkConfig),
-                checkNotNull(typeFactory),
-                checkNotNull(cluster),
-                checkNotNull(getCalciteSqlDialect()),
-                rowType);
+    public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
+        return rexConverterFactory;
     }
 
     public FrameworkConfig createFrameworkConfig() {
@@ -328,4 +327,38 @@ public class PlannerContext {
                         typeFactory),
                 FlinkSqlOperatorTable.instance());
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // DefaultSqlExprToRexConverterFactory
+    // 
--------------------------------------------------------------------------------------------
+
+    private class DefaultSqlExprToRexConverterFactory implements 
SqlExprToRexConverterFactory {
+
+        @Override
+        public SqlExprToRexConverter create(
+                RelDataType inputRowType, @Nullable RelDataType outputType) {
+            return new SqlExprToRexConverterImpl(
+                    checkNotNull(frameworkConfig),
+                    checkNotNull(typeFactory),
+                    checkNotNull(cluster),
+                    checkNotNull(getCalciteSqlDialect()),
+                    inputRowType,
+                    outputType);
+        }
+
+        @Override
+        public SqlExprToRexConverter create(
+                RowType inputRowType, @Nullable LogicalType outputType) {
+            final RelDataType convertedInputRowType = 
typeFactory.buildRelNodeRowType(inputRowType);
+
+            final RelDataType convertedOutputType;
+            if (outputType != null) {
+                convertedOutputType = 
typeFactory.createFieldTypeFromLogicalType(outputType);
+            } else {
+                convertedOutputType = null;
+            }
+
+            return create(convertedInputRowType, convertedOutputType);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
index 9f2edac..8d37c48 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
@@ -52,7 +52,7 @@ public class CallExpressionResolver {
                                                             "We should not 
need to lookup any expressions at this point");
                                                 }),
                                 
context.getCatalogManager().getDataTypeFactory(),
-                                (sqlExpression, inputSchema) -> {
+                                (sqlExpression, inputRowType, outputType) -> {
                                     throw new TableException(
                                             "SQL expression parsing is not 
supported at this location.");
                                 })
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
index 3e3ffd2..8bb177b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
@@ -29,9 +30,12 @@ import 
org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
 import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 
+import javax.annotation.Nullable;
+
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
 import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException;
 import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException;
@@ -63,7 +67,11 @@ public final class TypeInferenceReturnInference implements 
SqlReturnTypeInferenc
     @Override
     public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
         final CallContext callContext =
-                new OperatorBindingCallContext(dataTypeFactory, definition, 
opBinding, null);
+                new OperatorBindingCallContext(
+                        dataTypeFactory,
+                        definition,
+                        opBinding,
+                        extractExpectedOutputType(opBinding));
         try {
             return inferReturnTypeOrError(unwrapTypeFactory(opBinding), 
callContext);
         } catch (ValidationException e) {
@@ -75,6 +83,16 @@ public final class TypeInferenceReturnInference implements 
SqlReturnTypeInferenc
 
     // 
--------------------------------------------------------------------------------------------
 
+    private @Nullable RelDataType extractExpectedOutputType(SqlOperatorBinding 
opBinding) {
+        if (opBinding instanceof SqlCallBinding) {
+            final SqlCallBinding binding = (SqlCallBinding) opBinding;
+            final FlinkCalciteSqlValidator validator =
+                    (FlinkCalciteSqlValidator) binding.getValidator();
+            return 
validator.getExpectedOutputType(binding.getCall()).orElse(null);
+        }
+        return null;
+    }
+
     private RelDataType inferReturnTypeOrError(
             FlinkTypeFactory typeFactory, CallContext callContext) {
         final LogicalType inferredType =
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 713eea3..1afc2c1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -106,7 +106,7 @@ public class FilterPushDownSpec extends 
SourceAbilitySpecBase {
                                                                 "We should not 
need to lookup any expressions at this point");
                                                     }),
                                     
context.getCatalogManager().getDataTypeFactory(),
-                                    (sqlExpression, inputSchema) -> {
+                                    (sqlExpression, inputRowType, outputType) 
-> {
                                         throw new TableException(
                                                 "SQL expression parsing is not 
supported at this location.");
                                     })
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 6e3afbc..e036e17 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -40,6 +40,8 @@ import org.apache.calcite.tools.{FrameworkConfig, 
RelConversionException}
 import org.apache.flink.sql.parser.ddl.SqlUseModules
 import org.apache.flink.table.planner.parse.CalciteParser
 
+import javax.annotation.Nullable
+
 import java.lang.{Boolean => JBoolean}
 import java.util
 import java.util.function.{Function => JFunction}
@@ -179,35 +181,43 @@ class FlinkPlannerImpl(
     }
   }
 
-  def validateExpression(sqlNode: SqlNode, inputRowType: RelDataType): SqlNode 
= {
-    validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType)
+  def validateExpression(
+      sqlNode: SqlNode,
+      inputRowType: RelDataType,
+      @Nullable outputType: RelDataType): SqlNode = {
+    validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType, 
outputType)
   }
 
   private def validateExpression(
       sqlNode: SqlNode,
       sqlValidator: FlinkCalciteSqlValidator,
-      inputRowType: RelDataType): SqlNode = {
-      val nameToTypeMap = inputRowType
-        .getFieldList
+      inputRowType: RelDataType,
+      @Nullable outputType: RelDataType)
+    : SqlNode = {
+      val nameToTypeMap = new util.HashMap[String, RelDataType]()
+      inputRowType.getFieldList
         .asScala
-        .map { field =>
-          (field.getName, field.getType)
-        }
-        .toMap[String, RelDataType]
-        .asJava
+        .foreach(f => nameToTypeMap.put(f.getName, f.getType))
+      if (outputType != null) {
+        sqlValidator.setExpectedOutputType(sqlNode, outputType)
+      }
       sqlValidator.validateParameterizedExpression(sqlNode, nameToTypeMap)
   }
 
-  def rex(sqlNode: SqlNode, inputRowType: RelDataType): RexNode = {
-    rex(sqlNode, getOrCreateSqlValidator(), inputRowType)
+  def rex(
+      sqlNode: SqlNode,
+      inputRowType: RelDataType,
+      @Nullable outputType: RelDataType): RexNode = {
+    rex(sqlNode, getOrCreateSqlValidator(), inputRowType, outputType)
   }
 
   private def rex(
       sqlNode: SqlNode,
       sqlValidator: FlinkCalciteSqlValidator,
-      inputRowType: RelDataType) = {
+      inputRowType: RelDataType,
+      @Nullable outputType: RelDataType) = {
     try {
-      val validatedSqlNode = validateExpression(sqlNode, sqlValidator, 
inputRowType)
+      val validatedSqlNode = validateExpression(sqlNode, sqlValidator, 
inputRowType, outputType)
       val sqlToRelConverter = createSqlToRelConverter(sqlValidator)
       val nameToNodeMap = inputRowType
         .getFieldList
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 8a0902b..e676771 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -92,11 +92,6 @@ abstract class PlannerBase(
   // temporary utility until we don't use planner expressions anymore
   
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
-  private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory {
-    override def create(tableRowType: RelDataType): SqlExprToRexConverter =
-      plannerContext.createSqlExprToRexConverter(tableRowType)
-  }
-
   private var parser: Parser = _
   private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
 
@@ -110,6 +105,8 @@ abstract class PlannerBase(
       getTraitDefs.toList
     )
 
+  private val sqlExprToRexConverterFactory = 
plannerContext.getSqlExprToRexConverterFactory
+
   /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
   private[flink] def getRelBuilder: FlinkRelBuilder = {
     val currentCatalogName = catalogManager.getCurrentCatalog
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
index b60ae51..c0e8a76 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
@@ -135,7 +135,7 @@ class LegacyCatalogSourceTable[T](
             s"`$name`"
           }
         }.toArray
-      val rexNodes = 
toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
+      val rexNodes = toRexFactory.create(newRelTable.getRowType, 
null).convertToRexNodes(fieldExprs)
       relBuilder.projectNamed(rexNodes.toList, fieldNames, true)
     }
 
@@ -158,7 +158,7 @@ class LegacyCatalogSourceTable[T](
       }
       val rowtimeIndex = fieldNames.indexOf(rowtime)
       val watermarkRexNode = toRexFactory
-        .create(actualRowType)
+        .create(actualRowType, null)
         .convertToRexNode(watermarkSpec.get.getWatermarkExpr)
       relBuilder.watermark(rowtimeIndex, watermarkRexNode)
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
index fc70477..1c24173 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
@@ -76,9 +76,7 @@ public class ParserImplTest {
                     catalogManager,
                     plannerSupplier,
                     () -> plannerSupplier.get().parser(),
-                    t ->
-                            plannerContext.createSqlExprToRexConverter(
-                                    
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+                    plannerContext.getSqlExprToRexConverterFactory());
 
     private static final List<TestSpec> TEST_SPECS =
             Arrays.asList(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index cc5f7bf..26cd1cc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -129,17 +129,6 @@ public class SqlToOperationConverterTest {
                             .createFlinkPlanner(
                                     catalogManager.getCurrentCatalog(),
                                     catalogManager.getCurrentDatabase());
-    private final Parser parser =
-            new ParserImpl(
-                    catalogManager,
-                    plannerSupplier,
-                    () -> plannerSupplier.get().parser(),
-                    t ->
-                            getPlannerContext()
-                                    .createSqlExprToRexConverter(
-                                            getPlannerContext()
-                                                    .getTypeFactory()
-                                                    .buildRelNodeRowType(t)));
     private final PlannerContext plannerContext =
             new PlannerContext(
                     tableConfig,
@@ -148,6 +137,13 @@ public class SqlToOperationConverterTest {
                     asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
                     Collections.emptyList());
 
+    private final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    getPlannerContext().getSqlExprToRexConverterFactory());
+
     private PlannerContext getPlannerContext() {
         return plannerContext;
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 9ead357..2f0566a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -28,11 +28,13 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.Column;
@@ -48,6 +50,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Collector;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -57,6 +60,8 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.time.DayOfWeek;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.Collections;
@@ -246,7 +251,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                 tableEnv.fromDataStream(
                         dataStream,
                         Schema.newBuilder()
-                                .columnByMetadata("rowtime", "TIMESTAMP(3)")
+                                .columnByMetadata("rowtime", 
"TIMESTAMP_LTZ(3)")
                                 .watermark("rowtime", "SOURCE_WATERMARK()")
                                 .build());
 
@@ -257,12 +262,14 @@ public class DataStreamJavaITCase extends 
AbstractTestBase {
                                 Column.physical("f0", 
DataTypes.BIGINT().notNull()),
                                 Column.physical("f1", 
DataTypes.INT().notNull()),
                                 Column.physical("f2", DataTypes.STRING()),
-                                Column.metadata("rowtime", 
DataTypes.TIMESTAMP(3), null, false)),
+                                Column.metadata(
+                                        "rowtime", DataTypes.TIMESTAMP_LTZ(3), 
null, false)),
                         Collections.singletonList(
                                 WatermarkSpec.of(
                                         "rowtime",
                                         ResolvedExpressionMock.of(
-                                                DataTypes.TIMESTAMP(3), 
"`SOURCE_WATERMARK`()"))),
+                                                DataTypes.TIMESTAMP_LTZ(3),
+                                                "`SOURCE_WATERMARK`()"))),
                         null));
 
         tableEnv.createTemporaryView("t", table);
@@ -305,7 +312,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                 tableEnv.fromChangelogStream(
                         changelogStream,
                         Schema.newBuilder()
-                                .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP(3))
+                                .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP_LTZ(3))
                                 .watermark("rowtime", "SOURCE_WATERMARK()")
                                 .build());
         tableEnv.createTemporaryView("t", table);
@@ -431,6 +438,62 @@ public class DataStreamJavaITCase extends AbstractTestBase 
{
                 getOutput(inputOrOutput));
     }
 
+    @Test
+    public void testToDataStreamCustomEventTime() throws Exception {
+        final TableConfig config = tableEnv.getConfig();
+
+        // session time zone should not have an impact on the conversion
+        final ZoneId originalZone = config.getLocalTimeZone();
+        config.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
+
+        final LocalDateTime localDateTime1 = 
LocalDateTime.parse("1970-01-01T00:00:00.000");
+        final LocalDateTime localDateTime2 = 
LocalDateTime.parse("1970-01-01T01:00:00.000");
+
+        final DataStream<Tuple2<LocalDateTime, String>> dataStream =
+                env.fromElements(
+                        new Tuple2<>(localDateTime1, "alice"), new 
Tuple2<>(localDateTime2, "bob"));
+
+        final Table table =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", "TIMESTAMP(3)")
+                                .column("f1", "STRING")
+                                .watermark("f0", "SOURCE_WATERMARK()")
+                                .build());
+
+        testSchema(
+                table,
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("f0", DataTypes.TIMESTAMP(3)),
+                                Column.physical("f1", DataTypes.STRING())),
+                        Collections.singletonList(
+                                WatermarkSpec.of(
+                                        "f0",
+                                        ResolvedExpressionMock.of(
+                                                DataTypes.TIMESTAMP(3), 
"`SOURCE_WATERMARK`()"))),
+                        null));
+
+        final DataStream<Long> rowtimeStream =
+                tableEnv.toDataStream(table)
+                        .process(
+                                new ProcessFunction<Row, Long>() {
+                                    @Override
+                                    public void processElement(
+                                            Row value, Context ctx, 
Collector<Long> out) {
+                                        out.collect(ctx.timestamp());
+                                    }
+                                });
+
+        testResult(
+                rowtimeStream,
+                
localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(),
+                
localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli());
+
+        config.setLocalTimeZone(originalZone);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper methods
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index f7dcf76..becbed1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -59,9 +59,7 @@ public class PlannerMocks {
                         catalogManager,
                         () -> planner,
                         planner::parser,
-                        t ->
-                                plannerContext.createSqlExprToRexConverter(
-                                        
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+                        plannerContext.getSqlExprToRexConverterFactory());
         catalogManager.initSchemaResolver(
                 isStreamingMode,
                 ExpressionResolver.resolverFor(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 90c5010..00350e0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -60,27 +60,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: 
Boolean) {
   val config = new TableConfig
   val catalogManager: CatalogManager = 
CatalogManagerMocks.createEmptyCatalogManager()
   val functionCatalog = new FunctionCatalog(config, catalogManager, new 
ModuleManager)
-  private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory {
-    override def create(tableRowType: RelDataType): SqlExprToRexConverter =
-      createSqlExprToRexConverter(tableRowType)
-  }
-  private val parser: Parser = new ParserImpl(
-    catalogManager,
-    new JSupplier[FlinkPlannerImpl] {
-      override def get(): FlinkPlannerImpl = getPlanner
-    },
-    // we do not cache the parser in order to use the most up to
-    // date configuration. Users might change parser configuration in 
TableConfig in between
-    // parsing statements
-    new JSupplier[CalciteParser] {
-      override def get(): CalciteParser = plannerContext.createCalciteParser()
-    },
-    new JFunction[TableSchema, SqlExprToRexConverter] {
-      override def apply(t: TableSchema): SqlExprToRexConverter = {
-        
sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
-      }
-    }
-  )
   val plannerContext = new PlannerContext(
     config,
     functionCatalog,
@@ -102,9 +81,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: 
Boolean) {
     GenericRowData.of(TimestampData.fromEpochMillis(6000L), JInt.valueOf(8))
   )
 
-  private def createSqlExprToRexConverter(tableRowType: RelDataType): 
SqlExprToRexConverter =
-    plannerContext.createSqlExprToRexConverter(tableRowType)
-
   @Test
   def testAscendingWatermark(): Unit = {
     val generator = generateWatermarkGenerator("ts - INTERVAL '0.001' SECOND",
@@ -202,7 +178,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: 
Boolean) {
         .getContext
         .unwrap(classOf[FlinkContext])
         .getSqlExprToRexConverterFactory
-        .create(tableRowType)
+        .create(tableRowType, null)
     val rexNode = converter.convertToRexNode(expr)
 
     if (useDefinedConstructor) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 69a2d18..f0ecc24 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -251,7 +251,7 @@ class FlinkRelMdHandlerTestBase {
       .unwrap(classOf[FlinkContext])
     val watermarkRexNode = flinkContext
       .getSqlExprToRexConverterFactory
-      .create(scan.getTable.getRowType)
+      .create(scan.getTable.getRowType, null)
       .convertToRexNode("rowtime - INTERVAL '10' SECOND")
 
     relBuilder.push(scan)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
index 2c2c4ef..11c0d02 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner;
 
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -29,12 +28,16 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.parse.CalciteParser;
 import org.apache.flink.table.parse.ExtendedParser;
 import org.apache.flink.table.sqlexec.SqlToOperationConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.advise.SqlAdvisor;
 import org.apache.calcite.sql.advise.SqlAdvisorValidator;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -105,7 +108,8 @@ public class ParserImpl implements Parser {
     }
 
     @Override
-    public ResolvedExpression parseSqlExpression(String sqlExpression, 
TableSchema inputSchema) {
+    public ResolvedExpression parseSqlExpression(
+            String sqlExpression, RowType inputRowType, @Nullable LogicalType 
outputType) {
         throw new UnsupportedOperationException(
                 "Computed columns is only supported by the Blink planner.");
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 8984550..0cadaef 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -44,6 +44,7 @@ import org.apache.flink.table.parse.CalciteParser
 import org.apache.flink.table.planner.{ParserImpl, 
PlanningConfigurationBuilder}
 import org.apache.flink.table.sinks.{BatchSelectTableSink, BatchTableSink, 
OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, 
TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 import org.apache.flink.table.types.{AbstractDataType, DataType}
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.table.utils.PrintUtils
@@ -112,7 +113,10 @@ abstract class TableEnvImpl(
     catalogManager.getDataTypeFactory,
     tableLookup,
     new SqlExpressionResolver {
-      override def resolveExpression(sqlExpression: String, inputSchema: 
TableSchema)
+      override def resolveExpression(
+          sqlExpression: String,
+          inputRowType: RowType,
+          outputType: LogicalType)
         : ResolvedExpression = {
             throw new UnsupportedOperationException(
               "SQL expression parsing is only supported in the Blink planner.")
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
index c24775f..c8cf4fa 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
@@ -70,9 +70,11 @@ public class OutputConversionOperator extends 
TableStreamOperator<Object>
         final RowData rowData = element.getValue();
 
         if (consumeRowtimeMetadata) {
+            // timestamp is TIMESTAMP_LTZ
             final long rowtime = rowData.getTimestamp(rowData.getArity() - 1, 
3).getMillisecond();
             outRecord.setTimestamp(rowtime);
         } else if (rowtimeIndex != -1) {
+            // timestamp might be TIMESTAMP or TIMESTAMP_LTZ
             final long rowtime = rowData.getTimestamp(rowtimeIndex, 
3).getMillisecond();
             outRecord.setTimestamp(rowtime);
         }

Reply via email to