This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new ebf50c4 [FLINK-22378][table] Derive type of SOURCE_WATERMARK() from
time attribute
ebf50c4 is described below
commit ebf50c48ccc5e93d58a21e825a8542165a3fc129
Author: Timo Walther <[email protected]>
AuthorDate: Thu Apr 22 17:28:38 2021 +0200
[FLINK-22378][table] Derive type of SOURCE_WATERMARK() from time attribute
This closes #15730.
---
.../table/planner/delegation/hive/HiveParser.java | 6 +-
.../planner/delegation/hive/HiveParserFactory.java | 6 --
.../table/api/internal/TableEnvironmentImpl.java | 6 +-
.../flink/table/catalog/DefaultSchemaResolver.java | 25 +++++---
.../org/apache/flink/table/delegation/Parser.java | 11 +++-
.../expressions/resolver/ExpressionResolver.java | 18 ++++++
.../resolver/SqlExpressionResolver.java | 8 ++-
.../resolver/rules/ResolveSqlCallRule.java | 45 +++++++++++---
.../expressions/resolver/rules/ResolverRule.java | 4 ++
.../catalog/CatalogBaseTableResolutionTest.java | 6 +-
.../flink/table/catalog/SchemaResolutionTest.java | 30 +++++----
.../resolver/ExpressionResolverTest.java | 2 +-
.../utils/ValuesOperationTreeBuilderTest.java | 2 +-
.../flink/table/utils/ExpressionResolverMocks.java | 2 +-
.../org/apache/flink/table/utils/ParserMock.java | 8 ++-
.../table/connector/sink/DynamicTableSink.java | 7 +--
.../functions/BuiltInFunctionDefinitions.java | 2 +-
.../table/types/inference/TypeStrategies.java | 16 +++++
.../planner/calcite/FlinkCalciteSqlValidator.java | 17 ++++++
.../planner/calcite/SqlExprToRexConverter.java | 3 +
.../calcite/SqlExprToRexConverterFactory.java | 19 +++++-
.../planner/calcite/SqlExprToRexConverterImpl.java | 19 ++++--
.../planner/delegation/DefaultParserFactory.java | 7 +--
.../flink/table/planner/delegation/ParserImpl.java | 17 +++---
.../table/planner/delegation/PlannerContext.java | 55 +++++++++++++----
.../expressions/CallExpressionResolver.java | 2 +-
.../inference/TypeInferenceReturnInference.java | 20 +++++-
.../plan/abilities/source/FilterPushDownSpec.java | 2 +-
.../table/planner/calcite/FlinkPlannerImpl.scala | 38 +++++++-----
.../table/planner/delegation/PlannerBase.scala | 7 +--
.../plan/schema/LegacyCatalogSourceTable.scala | 4 +-
.../table/planner/delegation/ParserImplTest.java | 4 +-
.../operations/SqlToOperationConverterTest.java | 18 +++---
.../runtime/stream/sql/DataStreamJavaITCase.java | 71 ++++++++++++++++++++--
.../flink/table/planner/utils/PlannerMocks.java | 4 +-
.../codegen/WatermarkGeneratorCodeGenTest.scala | 26 +-------
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 2 +-
.../org/apache/flink/table/planner/ParserImpl.java | 8 ++-
.../flink/table/api/internal/TableEnvImpl.scala | 6 +-
.../operators/sink/OutputConversionOperator.java | 2 +
40 files changed, 390 insertions(+), 165 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index b231390..b2218c9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.delegation.hive;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.api.SqlParserException;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
@@ -36,7 +35,6 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import
org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseException;
@@ -80,7 +78,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.function.Function;
import java.util.function.Supplier;
/** A Parser that uses Hive's planner to parse a statement. */
@@ -176,13 +173,12 @@ public class HiveParser extends ParserImpl {
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
Supplier<CalciteParser> calciteParserSupplier,
- Function<TableSchema, SqlExprToRexConverter>
sqlExprToRexConverterCreator,
PlannerContext plannerContext) {
super(
catalogManager,
validatorSupplier,
calciteParserSupplier,
- sqlExprToRexConverterCreator);
+ plannerContext.getSqlExprToRexConverterFactory());
this.plannerContext = plannerContext;
this.catalogReader =
plannerContext.createCatalogReader(
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
index b23603e..dab4c5e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.delegation.ParserFactory;
import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -36,8 +35,6 @@ public class HiveParserFactory implements ParserFactory {
@Override
public Parser create(CatalogManager catalogManager, PlannerContext
plannerContext) {
- SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
- plannerContext::createSqlExprToRexConverter;
return new HiveParser(
catalogManager,
() ->
@@ -45,9 +42,6 @@ public class HiveParserFactory implements ParserFactory {
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase()),
plannerContext::createCalciteParser,
- tableSchema ->
- sqlExprToRexConverterFactory.create(
-
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
plannerContext);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9bb3530..bb6dd58 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -255,9 +255,11 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
return Optional.empty();
}
},
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType) -> {
try {
- return
getParser().parseSqlExpression(sqlExpression, inputSchema);
+ return getParser()
+ .parseSqlExpression(
+ sqlExpression, inputRowType,
outputType);
} catch (Throwable t) {
throw new ValidationException(
String.format("Invalid SQL expression:
%s", sqlExpression),
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
index 5b008cc..0993669 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -144,7 +144,8 @@ class DefaultSchemaResolver implements SchemaResolver {
UnresolvedComputedColumn unresolvedColumn, List<Column>
inputColumns) {
final ResolvedExpression resolvedExpression;
try {
- resolvedExpression = resolveExpression(inputColumns,
unresolvedColumn.getExpression());
+ resolvedExpression =
+ resolveExpression(inputColumns,
unresolvedColumn.getExpression(), null);
} catch (Exception e) {
throw new ValidationException(
String.format(
@@ -189,22 +190,26 @@ class DefaultSchemaResolver implements SchemaResolver {
final ResolvedExpression watermarkExpression;
try {
watermarkExpression =
- resolveExpression(inputColumns,
watermarkSpec.getWatermarkExpression());
+ resolveExpression(
+ inputColumns,
+ watermarkSpec.getWatermarkExpression(),
+ validatedTimeColumn.getDataType());
} catch (Exception e) {
throw new ValidationException(
String.format(
"Invalid expression for watermark '%s'.",
watermarkSpec.toString()),
e);
}
-
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
+ final LogicalType outputType =
watermarkExpression.getOutputDataType().getLogicalType();
+ final LogicalType timeColumnType =
validatedTimeColumn.getDataType().getLogicalType();
+ validateWatermarkExpression(outputType);
- if
(!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
- ==
validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
+ if (outputType.getTypeRoot() != timeColumnType.getTypeRoot()) {
throw new ValidationException(
String.format(
- "The watermark output type %s is different from
input time filed type %s.",
- watermarkExpression.getOutputDataType(),
- validatedTimeColumn.getDataType()));
+ "The watermark declaration's output data type '%s'
is different "
+ + "from the time field's data type '%s'.",
+ outputType, timeColumnType));
}
return Collections.singletonList(
@@ -348,13 +353,15 @@ class DefaultSchemaResolver implements SchemaResolver {
}
}
- private ResolvedExpression resolveExpression(List<Column> columns,
Expression expression) {
+ private ResolvedExpression resolveExpression(
+ List<Column> columns, Expression expression, @Nullable DataType
outputDataType) {
final LocalReferenceExpression[] localRefs =
columns.stream()
.map(c -> localRef(c.getName(), c.getDataType()))
.toArray(LocalReferenceExpression[]::new);
return resolverBuilder
.withLocalReferences(localRefs)
+ .withOutputDataType(outputDataType)
.build()
.resolve(Collections.singletonList(expression))
.get(0);
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
index 20f07e1..0ff4567 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
@@ -19,11 +19,14 @@
package org.apache.flink.table.delegation;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
import java.util.List;
@@ -58,11 +61,13 @@ public interface Parser {
* Entry point for parsing SQL expressions expressed as a String.
*
* @param sqlExpression the SQL expression to parse
- * @param inputSchema the schema of the fields in sql expression
+ * @param inputRowType the fields available in the SQL expression
+ * @param outputType expected top-level output type if available
* @return resolved expression
* @throws org.apache.flink.table.api.SqlParserException when failed to
parse the sql expression
*/
- ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema
inputSchema);
+ ResolvedExpression parseSqlExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType);
/**
* Returns completion hints for the given statement at the given cursor
position. The completion
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 3c58f0f..fce7309 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -43,6 +43,8 @@ import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -117,6 +119,8 @@ public class ExpressionResolver {
private final Map<String, LocalReferenceExpression> localReferences;
+ private final @Nullable DataType outputDataType;
+
private final Map<Expression, LocalOverWindow> localOverWindows;
private final boolean isGroupedAggregation;
@@ -130,6 +134,7 @@ public class ExpressionResolver {
FieldReferenceLookup fieldLookup,
List<OverWindow> localOverWindows,
List<LocalReferenceExpression> localReferences,
+ @Nullable DataType outputDataType,
boolean isGroupedAggregation) {
this.config = Preconditions.checkNotNull(config).getConfiguration();
this.tableLookup = Preconditions.checkNotNull(tableLookup);
@@ -149,6 +154,7 @@ public class ExpressionResolver {
"Duplicate local
reference: " + u);
},
LinkedHashMap::new));
+ this.outputDataType = outputDataType;
this.localOverWindows = prepareOverWindows(localOverWindows);
this.isGroupedAggregation = isGroupedAggregation;
}
@@ -324,6 +330,11 @@ public class ExpressionResolver {
}
@Override
+ public Optional<DataType> getOutputDataType() {
+ return Optional.ofNullable(outputDataType);
+ }
+
+ @Override
public Optional<LocalOverWindow> getOverWindow(Expression alias) {
return Optional.ofNullable(localOverWindows.get(alias));
}
@@ -443,6 +454,7 @@ public class ExpressionResolver {
private final SqlExpressionResolver sqlExpressionResolver;
private List<OverWindow> logicalOverWindows = new ArrayList<>();
private List<LocalReferenceExpression> localReferences = new
ArrayList<>();
+ private @Nullable DataType outputDataType;
private boolean isGroupedAggregation;
private ExpressionResolverBuilder(
@@ -471,6 +483,11 @@ public class ExpressionResolver {
return this;
}
+ public ExpressionResolverBuilder withOutputDataType(@Nullable DataType
outputDataType) {
+ this.outputDataType = outputDataType;
+ return this;
+ }
+
public ExpressionResolverBuilder withGroupedAggregation(boolean
isGroupedAggregation) {
this.isGroupedAggregation = isGroupedAggregation;
return this;
@@ -486,6 +503,7 @@ public class ExpressionResolver {
new FieldReferenceLookup(queryOperations),
logicalOverWindows,
localReferences,
+ outputDataType,
isGroupedAggregation);
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
index fd1bd7a..516837d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java
@@ -19,14 +19,18 @@
package org.apache.flink.table.expressions.resolver;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
/** Translates a SQL expression string into a {@link ResolvedExpression}. */
@Internal
public interface SqlExpressionResolver {
/** Translates the given SQL expression string or throws a {@link
ValidationException}. */
- ResolvedExpression resolveExpression(String sqlExpression, TableSchema
inputSchema);
+ ResolvedExpression resolveExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
index cf2d74b..fb8de72 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -19,13 +19,19 @@
package org.apache.flink.table.expressions.resolver.rules;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,32 +43,49 @@ final class ResolveSqlCallRule implements ResolverRule {
@Override
public List<Expression> apply(List<Expression> expression,
ResolutionContext context) {
- return expression.stream()
- .map(expr -> expr.accept(new
TranslateSqlCallsVisitor(context)))
- .collect(Collectors.toList());
+ // only the top-level expressions may access the output data type
+ final LogicalType outputType =
+
context.getOutputDataType().map(DataType::getLogicalType).orElse(null);
+ final TranslateSqlCallsVisitor visitor = new
TranslateSqlCallsVisitor(context, outputType);
+ return expression.stream().map(expr ->
expr.accept(visitor)).collect(Collectors.toList());
}
private static class TranslateSqlCallsVisitor extends
RuleExpressionVisitor<Expression> {
- TranslateSqlCallsVisitor(ResolutionContext resolutionContext) {
+ private final @Nullable LogicalType outputType;
+
+ TranslateSqlCallsVisitor(
+ ResolutionContext resolutionContext, @Nullable LogicalType
outputType) {
super(resolutionContext);
+ this.outputType = outputType;
}
@Override
public Expression visit(SqlCallExpression sqlCall) {
final SqlExpressionResolver resolver =
resolutionContext.sqlExpressionResolver();
- final TableSchema.Builder builder = TableSchema.builder();
+ final List<RowField> fields = new ArrayList<>();
// input references
resolutionContext
.referenceLookup()
.getAllInputFields()
- .forEach(f -> builder.field(f.getName(),
f.getOutputDataType()));
+ .forEach(
+ f ->
+ fields.add(
+ new RowField(
+ f.getName(),
+
f.getOutputDataType().getLogicalType())));
// local references
resolutionContext
.getLocalReferences()
- .forEach(refs -> builder.field(refs.getName(),
refs.getOutputDataType()));
- return resolver.resolveExpression(sqlCall.getSqlExpression(),
builder.build());
+ .forEach(
+ refs ->
+ fields.add(
+ new RowField(
+ refs.getName(),
+
refs.getOutputDataType().getLogicalType())));
+ return resolver.resolveExpression(
+ sqlCall.getSqlExpression(), new RowType(false, fields),
outputType);
}
@Override
@@ -76,8 +99,10 @@ final class ResolveSqlCallRule implements ResolverRule {
}
private List<Expression> resolveChildren(List<Expression>
lookupChildren) {
+ final TranslateSqlCallsVisitor visitor =
+ new TranslateSqlCallsVisitor(resolutionContext, null);
return lookupChildren.stream()
- .map(child -> child.accept(this))
+ .map(child -> child.accept(visitor))
.collect(Collectors.toList());
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index bc212ce..0ee8c5e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
import
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup;
import
org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
import java.util.List;
import java.util.Optional;
@@ -87,6 +88,9 @@ public interface ResolverRule {
/** Access to available local references. */
List<LocalReferenceExpression> getLocalReferences();
+ /** Access to the expected top-level output data type. */
+ Optional<DataType> getOutputDataType();
+
/** Access to available local over windows. */
Optional<LocalOverWindow> getOverWindow(Expression alias);
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index cef78b3..5085ad7 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -26,11 +26,15 @@ import
org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.expressions.ResolvedExpression;
import
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -268,7 +272,7 @@ public class CatalogBaseTableResolutionTest {
}
private static ResolvedExpression resolveSqlExpression(
- String sqlExpression, TableSchema inputSchema) {
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType) {
switch (sqlExpression) {
case COMPUTED_SQL:
return COMPUTED_COLUMN_RESOLVED;
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index aeb5bda..6e0edd6 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -21,18 +21,20 @@ package org.apache.flink.table.catalog;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collections;
@@ -217,7 +219,8 @@ public class SchemaResolutionTest {
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", callSql(INVALID_WATERMARK_SQL))
.build(),
- "The watermark output type TIMESTAMP_LTZ(3) is different from
input time filed type TIMESTAMP(3).");
+ "The watermark declaration's output data type
'TIMESTAMP_LTZ(3)' is "
+ + "different from the time field's data type
'TIMESTAMP(3)'.");
testError(
Schema.newBuilder()
@@ -440,27 +443,27 @@ public class SchemaResolutionTest {
}
private static ResolvedExpression resolveSqlExpression(
- String sqlExpression, TableSchema inputSchema) {
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType) {
switch (sqlExpression) {
case COMPUTED_SQL:
assertThat(
- inputSchema.getFieldDataType("orig_ts").orElse(null),
- equalTo(DataTypes.TIMESTAMP(3)));
+ getType(inputRowType, "orig_ts"),
+ equalTo(DataTypes.TIMESTAMP(3).getLogicalType()));
return COMPUTED_COLUMN_RESOLVED;
case COMPUTED_SQL_WITH_TS_LTZ:
assertThat(
- inputSchema.getFieldDataType("ts_ltz").orElse(null),
- equalTo(DataTypes.TIMESTAMP_LTZ(3)));
+ getType(inputRowType, "ts_ltz"),
+ equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType()));
return COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ;
case WATERMARK_SQL:
assertThat(
- inputSchema.getFieldDataType("ts").orElse(null),
- equalTo(DataTypes.TIMESTAMP(3)));
+ getType(inputRowType, "ts"),
+ equalTo(DataTypes.TIMESTAMP(3).getLogicalType()));
return WATERMARK_RESOLVED;
case WATERMARK_SQL_WITH_TS_LTZ:
assertThat(
- inputSchema.getFieldDataType("ts1").orElse(null),
- equalTo(DataTypes.TIMESTAMP_LTZ(3)));
+ getType(inputRowType, "ts1"),
+ equalTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType()));
return WATERMARK_RESOLVED_WITH_TS_LTZ;
case PROCTIME_SQL:
return PROCTIME_RESOLVED;
@@ -478,4 +481,9 @@ public class SchemaResolutionTest {
.getDataType()
.getLogicalType();
}
+
+ private static LogicalType getType(RowType inputRowType, String field) {
+ final int pos = inputRowType.getFieldIndex(field);
+ return inputRowType.getTypeAt(pos);
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index f48033d..b89d09b 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -332,7 +332,7 @@ public class ExpressionResolverTest {
name -> Optional.empty(),
new FunctionLookupMock(functions),
new DataTypeFactoryMock(),
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType) -> {
throw new UnsupportedOperationException();
},
Arrays.stream(schemas)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
index a18c1be..d011b9c 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/utils/ValuesOperationTreeBuilderTest.java
@@ -496,7 +496,7 @@ public class ValuesOperationTreeBuilderTest {
new FunctionLookupMock(Collections.emptyMap()),
new DataTypeFactoryMock(),
name -> Optional.empty(), // do not support
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType) -> {
throw new UnsupportedOperationException();
},
true);
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
index 4deb785..68970fe 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExpressionResolverMocks.java
@@ -47,7 +47,7 @@ public final class ExpressionResolverMocks {
public static ExpressionResolverBuilder dummyResolver() {
return forSqlExpression(
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType) -> {
throw new UnsupportedOperationException();
});
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
index c56ee9c..526c682 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
@@ -18,11 +18,14 @@
package org.apache.flink.table.utils;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
import java.util.List;
@@ -39,7 +42,8 @@ public class ParserMock implements Parser {
}
@Override
- public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ public ResolvedExpression parseSqlExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType) {
return null;
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 74f94f8..ad58228 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.connector.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.RuntimeConverter;
@@ -96,8 +96,7 @@ public interface DynamicTableSink {
* interfaces might be located in other Flink modules.
*
* <p>Independent of the provider interface, the table runtime expects
that a sink
- * implementation accepts internal data structures (see {@link
- * org.apache.flink.table.data.RowData} for more information).
+ * implementation accepts internal data structures (see {@link RowData}
for more information).
*
* <p>The given {@link Context} offers utilities by the planner for
creating runtime
* implementation with minimal dependencies to internal data structures.
@@ -146,7 +145,7 @@ public interface DynamicTableSink {
* Creates type information describing the internal data structures of
the given {@link
* DataType}.
*
- * @see TableSchema#toPhysicalRowDataType()
+ * @see ResolvedSchema#toPhysicalRowDataType()
*/
<T> TypeInformation<T> createTypeInformation(DataType
consumedDataType);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index ce4f359..7d626a2 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -117,7 +117,7 @@ public final class BuiltInFunctionDefinitions {
.name("SOURCE_WATERMARK")
.kind(SCALAR)
.inputTypeStrategy(NO_ARGS)
- .outputTypeStrategy(explicit(DataTypes.TIMESTAMP(3)))
+ .outputTypeStrategy(TypeStrategies.SOURCE_WATERMARK)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction")
.build();
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
index 1815ed1..d65b151 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
@@ -69,6 +69,7 @@ public final class TypeStrategies {
/** Placeholder for a missing type strategy. */
public static final TypeStrategy MISSING = new MissingTypeStrategy();
+ /** Type strategy that returns a common, least restrictive type of all
arguments. */
public static final TypeStrategy COMMON = new CommonTypeStrategy();
/** Type strategy that returns a fixed {@link DataType}. */
@@ -418,6 +419,21 @@ public final class TypeStrategies {
return Optional.of(nullReplacementDataType);
};
+ /** Type strategy specific for source watermarks that depend on the output
type. */
+ public static final TypeStrategy SOURCE_WATERMARK =
+ callContext -> {
+ final DataType timestampDataType =
+ callContext
+ .getOutputDataType()
+ .filter(
+ dt ->
+ hasFamily(
+ dt.getLogicalType(),
+
LogicalTypeFamily.TIMESTAMP))
+ .orElse(DataTypes.TIMESTAMP_LTZ(3));
+ return Optional.of(timestampDataType);
+ };
+
/**
* Type strategy specific for aggregations that partially produce
different nullability
* depending whether the result is grouped or not.
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 6ca4202..bd5a4a8 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -42,6 +42,7 @@ import org.apache.calcite.util.Static;
import java.math.BigDecimal;
import java.util.List;
+import java.util.Optional;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
@@ -49,6 +50,10 @@ import static
org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
@Internal
public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
+ // Enables CallContext#getOutputDataType() when validating SQL expressions.
+ private SqlNode sqlNodeForExpectedOutputType;
+ private RelDataType expectedOutputType;
+
public FlinkCalciteSqlValidator(
SqlOperatorTable opTab,
SqlValidatorCatalogReader catalogReader,
@@ -57,6 +62,18 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
super(opTab, catalogReader, typeFactory, config);
}
+ public void setExpectedOutputType(SqlNode sqlNode, RelDataType
expectedOutputType) {
+ this.sqlNodeForExpectedOutputType = sqlNode;
+ this.expectedOutputType = expectedOutputType;
+ }
+
+ public Optional<RelDataType> getExpectedOutputType(SqlNode sqlNode) {
+ if (sqlNode == sqlNodeForExpectedOutputType) {
+ return Optional.of(expectedOutputType);
+ }
+ return Optional.empty();
+ }
+
@Override
public void validateLiteral(SqlLiteral literal) {
if (literal.getTypeName() == DECIMAL) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
index 473d4d4..49294c11 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverter.java
@@ -18,9 +18,12 @@
package org.apache.flink.table.planner.calcite;
+import org.apache.flink.annotation.Internal;
+
import org.apache.calcite.rex.RexNode;
/** Converts SQL expressions to {@link RexNode}. */
+@Internal
public interface SqlExprToRexConverter {
/**
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
index e1a8cc9..83ed640 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterFactory.java
@@ -18,13 +18,28 @@
package org.apache.flink.table.planner.calcite;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import javax.annotation.Nullable;
/** Factory to create {@link SqlExprToRexConverter}. */
+@Internal
public interface SqlExprToRexConverterFactory {
/**
- * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL
expression to RexNode.
+ * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL
expression to {@link
+ * RexNode}.
+ */
+ SqlExprToRexConverter create(RelDataType inputRowType, @Nullable
RelDataType outputType);
+
+ /**
+ * Creates a new instance of {@link SqlExprToRexConverter} to convert SQL
expression to {@link
+ * RexNode}.
*/
- SqlExprToRexConverter create(RelDataType tableRowType);
+ SqlExprToRexConverter create(RowType inputRowType, @Nullable LogicalType
outputType);
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
index c0fa221..8bc2d00 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java
@@ -31,6 +31,8 @@ import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.FrameworkConfig;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Stream;
@@ -42,14 +44,17 @@ public class SqlExprToRexConverterImpl implements
SqlExprToRexConverter {
private final SqlDialect sqlDialect;
- private final RelDataType tableRowType;
+ private final RelDataType inputRowType;
+
+ private final @Nullable RelDataType outputType;
public SqlExprToRexConverterImpl(
FrameworkConfig config,
FlinkTypeFactory typeFactory,
RelOptCluster cluster,
SqlDialect sqlDialect,
- RelDataType tableRowType) {
+ RelDataType inputRowType,
+ @Nullable RelDataType outputType) {
this.planner =
new FlinkPlannerImpl(
config,
@@ -57,20 +62,22 @@ public class SqlExprToRexConverterImpl implements
SqlExprToRexConverter {
typeFactory,
cluster);
this.sqlDialect = sqlDialect;
- this.tableRowType = tableRowType;
+ this.inputRowType = inputRowType;
+ this.outputType = outputType;
}
@Override
public String expand(String expr) {
final CalciteParser parser = planner.parser();
final SqlNode node = parser.parseExpression(expr);
- final SqlNode validated = planner.validateExpression(node,
tableRowType);
+ final SqlNode validated = planner.validateExpression(node,
inputRowType, outputType);
return validated.toSqlString(sqlDialect).getSql();
}
@Override
public RexNode convertToRexNode(String expr) {
- return convertToRexNodes(new String[] {expr})[0];
+ final CalciteParser parser = planner.parser();
+ return planner.rex(parser.parseExpression(expr), inputRowType,
outputType);
}
@Override
@@ -78,7 +85,7 @@ public class SqlExprToRexConverterImpl implements
SqlExprToRexConverter {
final CalciteParser parser = planner.parser();
return Stream.of(exprs)
.map(parser::parseExpression)
- .map(node -> planner.rex(node, tableRowType))
+ .map(node -> planner.rex(node, inputRowType, null))
.toArray(RexNode[]::new);
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
index af968a0..99631f1 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import java.util.Collections;
import java.util.List;
@@ -33,8 +32,6 @@ import java.util.Map;
public class DefaultParserFactory implements ParserFactory {
@Override
public Parser create(CatalogManager catalogManager, PlannerContext
plannerContext) {
- SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
- plannerContext::createSqlExprToRexConverter;
return new ParserImpl(
catalogManager,
() ->
@@ -42,9 +39,7 @@ public class DefaultParserFactory implements ParserFactory {
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase()),
plannerContext::createCalciteParser,
- tableSchema ->
- sqlExprToRexConverterFactory.create(
-
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
+ plannerContext.getSqlExprToRexConverterFactory());
}
@Override
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index e9136a5..e4f7d53 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.delegation;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
@@ -28,11 +27,13 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.parse.ExtendedParser;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.calcite.rex.RexNode;
@@ -41,12 +42,13 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.advise.SqlAdvisor;
import org.apache.calcite.sql.advise.SqlAdvisorValidator;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -60,18 +62,18 @@ public class ParserImpl implements Parser {
// multiple statements parsing
private final Supplier<FlinkPlannerImpl> validatorSupplier;
private final Supplier<CalciteParser> calciteParserSupplier;
- private final Function<TableSchema, SqlExprToRexConverter>
sqlExprToRexConverterCreator;
+ private final SqlExprToRexConverterFactory sqlExprToRexConverterFactory;
private static final ExtendedParser EXTENDED_PARSER =
ExtendedParser.INSTANCE;
public ParserImpl(
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
Supplier<CalciteParser> calciteParserSupplier,
- Function<TableSchema, SqlExprToRexConverter>
sqlExprToRexConverterCreator) {
+ SqlExprToRexConverterFactory sqlExprToRexConverterFactory) {
this.catalogManager = catalogManager;
this.validatorSupplier = validatorSupplier;
this.calciteParserSupplier = calciteParserSupplier;
- this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator;
+ this.sqlExprToRexConverterFactory = sqlExprToRexConverterFactory;
}
/**
@@ -109,9 +111,10 @@ public class ParserImpl implements Parser {
}
@Override
- public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ public ResolvedExpression parseSqlExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType) {
final SqlExprToRexConverter sqlExprToRexConverter =
- sqlExprToRexConverterCreator.apply(inputSchema);
+ sqlExprToRexConverterFactory.create(inputRowType, outputType);
final RexNode rexNode =
sqlExprToRexConverter.convertToRexNode(sqlExpression);
final LogicalType logicalType =
FlinkTypeFactory.toLogicalType(rexNode.getType());
// expand expression for serializable expression strings similar to
views
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 2d856d9..f9c9397 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.planner.calcite.FlinkRexBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl;
import org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable;
import org.apache.flink.table.planner.codegen.ExpressionReducer;
@@ -47,6 +48,8 @@ import
org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteSchema;
@@ -69,6 +72,8 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
+import javax.annotation.Nullable;
+
import java.util.List;
import static java.util.Arrays.asList;
@@ -87,6 +92,8 @@ public class PlannerContext {
private final RelDataTypeSystem typeSystem = new FlinkTypeSystem();
private final FlinkTypeFactory typeFactory = new
FlinkTypeFactory(typeSystem);
+ private final SqlExprToRexConverterFactory rexConverterFactory =
+ new DefaultSqlExprToRexConverterFactory();
private final TableConfig tableConfig;
private final RelOptCluster cluster;
private final FlinkContext context;
@@ -104,10 +111,7 @@ public class PlannerContext {
this.context =
new FlinkContextImpl(
- tableConfig,
- functionCatalog,
- catalogManager,
- this::createSqlExprToRexConverter);
+ tableConfig, functionCatalog, catalogManager,
rexConverterFactory);
this.rootSchema = rootSchema;
this.traitDefs = traitDefs;
@@ -125,13 +129,8 @@ public class PlannerContext {
this.cluster = FlinkRelOptClusterFactory.create(planner, new
FlinkRexBuilder(typeFactory));
}
- public SqlExprToRexConverter createSqlExprToRexConverter(RelDataType
rowType) {
- return new SqlExprToRexConverterImpl(
- checkNotNull(frameworkConfig),
- checkNotNull(typeFactory),
- checkNotNull(cluster),
- checkNotNull(getCalciteSqlDialect()),
- rowType);
+ public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
+ return rexConverterFactory;
}
public FrameworkConfig createFrameworkConfig() {
@@ -328,4 +327,38 @@ public class PlannerContext {
typeFactory),
FlinkSqlOperatorTable.instance());
}
+
+ //
--------------------------------------------------------------------------------------------
+ // DefaultSqlExprToRexConverterFactory
+ //
--------------------------------------------------------------------------------------------
+
+ private class DefaultSqlExprToRexConverterFactory implements
SqlExprToRexConverterFactory {
+
+ @Override
+ public SqlExprToRexConverter create(
+ RelDataType inputRowType, @Nullable RelDataType outputType) {
+ return new SqlExprToRexConverterImpl(
+ checkNotNull(frameworkConfig),
+ checkNotNull(typeFactory),
+ checkNotNull(cluster),
+ checkNotNull(getCalciteSqlDialect()),
+ inputRowType,
+ outputType);
+ }
+
+ @Override
+ public SqlExprToRexConverter create(
+ RowType inputRowType, @Nullable LogicalType outputType) {
+ final RelDataType convertedInputRowType =
typeFactory.buildRelNodeRowType(inputRowType);
+
+ final RelDataType convertedOutputType;
+ if (outputType != null) {
+ convertedOutputType =
typeFactory.createFieldTypeFromLogicalType(outputType);
+ } else {
+ convertedOutputType = null;
+ }
+
+ return create(convertedInputRowType, convertedOutputType);
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
index 9f2edac..8d37c48 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/CallExpressionResolver.java
@@ -52,7 +52,7 @@ public class CallExpressionResolver {
"We should not
need to lookup any expressions at this point");
}),
context.getCatalogManager().getDataTypeFactory(),
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType) -> {
throw new TableException(
"SQL expression parsing is not
supported at this location.");
})
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
index 3e3ffd2..8bb177b 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeInference;
@@ -29,9 +30,12 @@ import
org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import javax.annotation.Nullable;
+
import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
import static
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException;
import static
org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException;
@@ -63,7 +67,11 @@ public final class TypeInferenceReturnInference implements
SqlReturnTypeInferenc
@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
final CallContext callContext =
- new OperatorBindingCallContext(dataTypeFactory, definition,
opBinding, null);
+ new OperatorBindingCallContext(
+ dataTypeFactory,
+ definition,
+ opBinding,
+ extractExpectedOutputType(opBinding));
try {
return inferReturnTypeOrError(unwrapTypeFactory(opBinding),
callContext);
} catch (ValidationException e) {
@@ -75,6 +83,16 @@ public final class TypeInferenceReturnInference implements
SqlReturnTypeInferenc
//
--------------------------------------------------------------------------------------------
+ private @Nullable RelDataType extractExpectedOutputType(SqlOperatorBinding
opBinding) {
+ if (opBinding instanceof SqlCallBinding) {
+ final SqlCallBinding binding = (SqlCallBinding) opBinding;
+ final FlinkCalciteSqlValidator validator =
+ (FlinkCalciteSqlValidator) binding.getValidator();
+ return
validator.getExpectedOutputType(binding.getCall()).orElse(null);
+ }
+ return null;
+ }
+
private RelDataType inferReturnTypeOrError(
FlinkTypeFactory typeFactory, CallContext callContext) {
final LogicalType inferredType =
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 713eea3..1afc2c1 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -106,7 +106,7 @@ public class FilterPushDownSpec extends
SourceAbilitySpecBase {
"We should not
need to lookup any expressions at this point");
}),
context.getCatalogManager().getDataTypeFactory(),
- (sqlExpression, inputSchema) -> {
+ (sqlExpression, inputRowType, outputType)
-> {
throw new TableException(
"SQL expression parsing is not
supported at this location.");
})
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 6e3afbc..e036e17 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -40,6 +40,8 @@ import org.apache.calcite.tools.{FrameworkConfig,
RelConversionException}
import org.apache.flink.sql.parser.ddl.SqlUseModules
import org.apache.flink.table.planner.parse.CalciteParser
+import javax.annotation.Nullable
+
import java.lang.{Boolean => JBoolean}
import java.util
import java.util.function.{Function => JFunction}
@@ -179,35 +181,43 @@ class FlinkPlannerImpl(
}
}
- def validateExpression(sqlNode: SqlNode, inputRowType: RelDataType): SqlNode
= {
- validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType)
+ def validateExpression(
+ sqlNode: SqlNode,
+ inputRowType: RelDataType,
+ @Nullable outputType: RelDataType): SqlNode = {
+ validateExpression(sqlNode, getOrCreateSqlValidator(), inputRowType,
outputType)
}
private def validateExpression(
sqlNode: SqlNode,
sqlValidator: FlinkCalciteSqlValidator,
- inputRowType: RelDataType): SqlNode = {
- val nameToTypeMap = inputRowType
- .getFieldList
+ inputRowType: RelDataType,
+ @Nullable outputType: RelDataType)
+ : SqlNode = {
+ val nameToTypeMap = new util.HashMap[String, RelDataType]()
+ inputRowType.getFieldList
.asScala
- .map { field =>
- (field.getName, field.getType)
- }
- .toMap[String, RelDataType]
- .asJava
+ .foreach(f => nameToTypeMap.put(f.getName, f.getType))
+ if (outputType != null) {
+ sqlValidator.setExpectedOutputType(sqlNode, outputType)
+ }
sqlValidator.validateParameterizedExpression(sqlNode, nameToTypeMap)
}
- def rex(sqlNode: SqlNode, inputRowType: RelDataType): RexNode = {
- rex(sqlNode, getOrCreateSqlValidator(), inputRowType)
+ def rex(
+ sqlNode: SqlNode,
+ inputRowType: RelDataType,
+ @Nullable outputType: RelDataType): RexNode = {
+ rex(sqlNode, getOrCreateSqlValidator(), inputRowType, outputType)
}
private def rex(
sqlNode: SqlNode,
sqlValidator: FlinkCalciteSqlValidator,
- inputRowType: RelDataType) = {
+ inputRowType: RelDataType,
+ @Nullable outputType: RelDataType) = {
try {
- val validatedSqlNode = validateExpression(sqlNode, sqlValidator,
inputRowType)
+ val validatedSqlNode = validateExpression(sqlNode, sqlValidator,
inputRowType, outputType)
val sqlToRelConverter = createSqlToRelConverter(sqlValidator)
val nameToNodeMap = inputRowType
.getFieldList
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 8a0902b..e676771 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -92,11 +92,6 @@ abstract class PlannerBase(
// temporary utility until we don't use planner expressions anymore
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
- private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory {
- override def create(tableRowType: RelDataType): SqlExprToRexConverter =
- plannerContext.createSqlExprToRexConverter(tableRowType)
- }
-
private var parser: Parser = _
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
@@ -110,6 +105,8 @@ abstract class PlannerBase(
getTraitDefs.toList
)
+ private val sqlExprToRexConverterFactory =
plannerContext.getSqlExprToRexConverterFactory
+
/** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: FlinkRelBuilder = {
val currentCatalogName = catalogManager.getCurrentCatalog
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
index b60ae51..c0e8a76 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
@@ -135,7 +135,7 @@ class LegacyCatalogSourceTable[T](
s"`$name`"
}
}.toArray
- val rexNodes =
toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
+ val rexNodes = toRexFactory.create(newRelTable.getRowType,
null).convertToRexNodes(fieldExprs)
relBuilder.projectNamed(rexNodes.toList, fieldNames, true)
}
@@ -158,7 +158,7 @@ class LegacyCatalogSourceTable[T](
}
val rowtimeIndex = fieldNames.indexOf(rowtime)
val watermarkRexNode = toRexFactory
- .create(actualRowType)
+ .create(actualRowType, null)
.convertToRexNode(watermarkSpec.get.getWatermarkExpr)
relBuilder.watermark(rowtimeIndex, watermarkRexNode)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
index fc70477..1c24173 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
@@ -76,9 +76,7 @@ public class ParserImplTest {
catalogManager,
plannerSupplier,
() -> plannerSupplier.get().parser(),
- t ->
- plannerContext.createSqlExprToRexConverter(
-
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+ plannerContext.getSqlExprToRexConverterFactory());
private static final List<TestSpec> TEST_SPECS =
Arrays.asList(
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index cc5f7bf..26cd1cc 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -129,17 +129,6 @@ public class SqlToOperationConverterTest {
.createFlinkPlanner(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
- private final Parser parser =
- new ParserImpl(
- catalogManager,
- plannerSupplier,
- () -> plannerSupplier.get().parser(),
- t ->
- getPlannerContext()
- .createSqlExprToRexConverter(
- getPlannerContext()
- .getTypeFactory()
- .buildRelNodeRowType(t)));
private final PlannerContext plannerContext =
new PlannerContext(
tableConfig,
@@ -148,6 +137,13 @@ public class SqlToOperationConverterTest {
asRootSchema(new
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
Collections.emptyList());
+ private final Parser parser =
+ new ParserImpl(
+ catalogManager,
+ plannerSupplier,
+ () -> plannerSupplier.get().parser(),
+ getPlannerContext().getSqlExprToRexConverterFactory());
+
private PlannerContext getPlannerContext() {
return plannerContext;
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 9ead357..2f0566a 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -28,11 +28,13 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
@@ -48,6 +50,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
@@ -57,6 +60,8 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.time.DayOfWeek;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
@@ -246,7 +251,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
- .columnByMetadata("rowtime", "TIMESTAMP(3)")
+ .columnByMetadata("rowtime",
"TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
@@ -257,12 +262,14 @@ public class DataStreamJavaITCase extends
AbstractTestBase {
Column.physical("f0",
DataTypes.BIGINT().notNull()),
Column.physical("f1",
DataTypes.INT().notNull()),
Column.physical("f2", DataTypes.STRING()),
- Column.metadata("rowtime",
DataTypes.TIMESTAMP(3), null, false)),
+ Column.metadata(
+ "rowtime", DataTypes.TIMESTAMP_LTZ(3),
null, false)),
Collections.singletonList(
WatermarkSpec.of(
"rowtime",
ResolvedExpressionMock.of(
- DataTypes.TIMESTAMP(3),
"`SOURCE_WATERMARK`()"))),
+ DataTypes.TIMESTAMP_LTZ(3),
+ "`SOURCE_WATERMARK`()"))),
null));
tableEnv.createTemporaryView("t", table);
@@ -305,7 +312,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
tableEnv.fromChangelogStream(
changelogStream,
Schema.newBuilder()
- .columnByMetadata("rowtime",
DataTypes.TIMESTAMP(3))
+ .columnByMetadata("rowtime",
DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
tableEnv.createTemporaryView("t", table);
@@ -431,6 +438,62 @@ public class DataStreamJavaITCase extends AbstractTestBase
{
getOutput(inputOrOutput));
}
+ @Test
+ public void testToDataStreamCustomEventTime() throws Exception {
+ final TableConfig config = tableEnv.getConfig();
+
+ // session time zone should not have an impact on the conversion
+ final ZoneId originalZone = config.getLocalTimeZone();
+ config.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
+
+ final LocalDateTime localDateTime1 =
LocalDateTime.parse("1970-01-01T00:00:00.000");
+ final LocalDateTime localDateTime2 =
LocalDateTime.parse("1970-01-01T01:00:00.000");
+
+ final DataStream<Tuple2<LocalDateTime, String>> dataStream =
+ env.fromElements(
+ new Tuple2<>(localDateTime1, "alice"), new
Tuple2<>(localDateTime2, "bob"));
+
+ final Table table =
+ tableEnv.fromDataStream(
+ dataStream,
+ Schema.newBuilder()
+ .column("f0", "TIMESTAMP(3)")
+ .column("f1", "STRING")
+ .watermark("f0", "SOURCE_WATERMARK()")
+ .build());
+
+ testSchema(
+ table,
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("f0", DataTypes.TIMESTAMP(3)),
+ Column.physical("f1", DataTypes.STRING())),
+ Collections.singletonList(
+ WatermarkSpec.of(
+ "f0",
+ ResolvedExpressionMock.of(
+ DataTypes.TIMESTAMP(3),
"`SOURCE_WATERMARK`()"))),
+ null));
+
+ final DataStream<Long> rowtimeStream =
+ tableEnv.toDataStream(table)
+ .process(
+ new ProcessFunction<Row, Long>() {
+ @Override
+ public void processElement(
+ Row value, Context ctx,
Collector<Long> out) {
+ out.collect(ctx.timestamp());
+ }
+ });
+
+ testResult(
+ rowtimeStream,
+
localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(),
+
localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli());
+
+ config.setLocalTimeZone(originalZone);
+ }
+
//
--------------------------------------------------------------------------------------------
// Helper methods
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index f7dcf76..becbed1 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -59,9 +59,7 @@ public class PlannerMocks {
catalogManager,
() -> planner,
planner::parser,
- t ->
- plannerContext.createSqlExprToRexConverter(
-
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+ plannerContext.getSqlExprToRexConverterFactory());
catalogManager.initSchemaResolver(
isStreamingMode,
ExpressionResolver.resolverFor(
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 90c5010..00350e0 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -60,27 +60,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor:
Boolean) {
val config = new TableConfig
val catalogManager: CatalogManager =
CatalogManagerMocks.createEmptyCatalogManager()
val functionCatalog = new FunctionCatalog(config, catalogManager, new
ModuleManager)
- private val sqlExprToRexConverterFactory = new SqlExprToRexConverterFactory {
- override def create(tableRowType: RelDataType): SqlExprToRexConverter =
- createSqlExprToRexConverter(tableRowType)
- }
- private val parser: Parser = new ParserImpl(
- catalogManager,
- new JSupplier[FlinkPlannerImpl] {
- override def get(): FlinkPlannerImpl = getPlanner
- },
- // we do not cache the parser in order to use the most up to
- // date configuration. Users might change parser configuration in
TableConfig in between
- // parsing statements
- new JSupplier[CalciteParser] {
- override def get(): CalciteParser = plannerContext.createCalciteParser()
- },
- new JFunction[TableSchema, SqlExprToRexConverter] {
- override def apply(t: TableSchema): SqlExprToRexConverter = {
-
sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
- }
- }
- )
val plannerContext = new PlannerContext(
config,
functionCatalog,
@@ -102,9 +81,6 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor:
Boolean) {
GenericRowData.of(TimestampData.fromEpochMillis(6000L), JInt.valueOf(8))
)
- private def createSqlExprToRexConverter(tableRowType: RelDataType):
SqlExprToRexConverter =
- plannerContext.createSqlExprToRexConverter(tableRowType)
-
@Test
def testAscendingWatermark(): Unit = {
val generator = generateWatermarkGenerator("ts - INTERVAL '0.001' SECOND",
@@ -202,7 +178,7 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor:
Boolean) {
.getContext
.unwrap(classOf[FlinkContext])
.getSqlExprToRexConverterFactory
- .create(tableRowType)
+ .create(tableRowType, null)
val rexNode = converter.convertToRexNode(expr)
if (useDefinedConstructor) {
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 69a2d18..f0ecc24 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -251,7 +251,7 @@ class FlinkRelMdHandlerTestBase {
.unwrap(classOf[FlinkContext])
val watermarkRexNode = flinkContext
.getSqlExprToRexConverterFactory
- .create(scan.getTable.getRowType)
+ .create(scan.getTable.getRowType, null)
.convertToRexNode("rowtime - INTERVAL '10' SECOND")
relBuilder.push(scan)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
index 2c2c4ef..11c0d02 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -29,12 +28,16 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.parse.CalciteParser;
import org.apache.flink.table.parse.ExtendedParser;
import org.apache.flink.table.sqlexec.SqlToOperationConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.advise.SqlAdvisor;
import org.apache.calcite.sql.advise.SqlAdvisorValidator;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -105,7 +108,8 @@ public class ParserImpl implements Parser {
}
@Override
- public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ public ResolvedExpression parseSqlExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType
outputType) {
throw new UnsupportedOperationException(
"Computed columns is only supported by the Blink planner.");
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 8984550..0cadaef 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -44,6 +44,7 @@ import org.apache.flink.table.parse.CalciteParser
import org.apache.flink.table.planner.{ParserImpl,
PlanningConfigurationBuilder}
import org.apache.flink.table.sinks.{BatchSelectTableSink, BatchTableSink,
OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink,
TableSink, TableSinkUtils}
import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
import org.apache.flink.table.types.{AbstractDataType, DataType}
import org.apache.flink.table.util.JavaScalaConversionUtil
import org.apache.flink.table.utils.PrintUtils
@@ -112,7 +113,10 @@ abstract class TableEnvImpl(
catalogManager.getDataTypeFactory,
tableLookup,
new SqlExpressionResolver {
- override def resolveExpression(sqlExpression: String, inputSchema:
TableSchema)
+ override def resolveExpression(
+ sqlExpression: String,
+ inputRowType: RowType,
+ outputType: LogicalType)
: ResolvedExpression = {
throw new UnsupportedOperationException(
"SQL expression parsing is only supported in the Blink planner.")
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
index c24775f..c8cf4fa 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java
@@ -70,9 +70,11 @@ public class OutputConversionOperator extends
TableStreamOperator<Object>
final RowData rowData = element.getValue();
if (consumeRowtimeMetadata) {
+ // timestamp is TIMESTAMP_LTZ
final long rowtime = rowData.getTimestamp(rowData.getArity() - 1,
3).getMillisecond();
outRecord.setTimestamp(rowtime);
} else if (rowtimeIndex != -1) {
+ // timestamp might be TIMESTAMP or TIMESTAMP_LTZ
final long rowtime = rowData.getTimestamp(rowtimeIndex,
3).getMillisecond();
outRecord.setTimestamp(rowtime);
}