dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436503474
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
##########
@@ -54,4 +56,14 @@
* @throws org.apache.flink.table.api.SqlParserException when failed to
parse the identifier
*/
UnresolvedIdentifier parseIdentifier(String identifier);
+
+ /**
+ * Entry point for parse sql expression expressed as a String.
Review comment:
```suggestion
* Entry point for parsing SQL expressions expressed as a String.
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String
identifier) {
SqlIdentifier sqlIdentifier =
parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+ List<String> fieldNames =
Arrays.asList(inputSchema.getFieldNames());
+ List<LogicalType> fieldTypes =
Arrays.stream(inputSchema.getFieldDataTypes())
+
.map(LogicalTypeDataTypeConverter::toLogicalType)
+ .collect(Collectors.toList());
+ RelDataType inputType = typeFactory.buildRelNodeRowType(
Review comment:
Could we change this method so that we do not need the `typeFactory`?
Could we e.g. instead of `Supplier<SqlExprToRexConverterFactory>` pass a
`Function<TableSchema, SqlExprToRexConverter`?
I'd prefer to limit the number of different cross dependencies unless
strictly necessary.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String
identifier) {
SqlIdentifier sqlIdentifier =
parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+ List<String> fieldNames =
Arrays.asList(inputSchema.getFieldNames());
+ List<LogicalType> fieldTypes =
Arrays.stream(inputSchema.getFieldDataTypes())
+
.map(LogicalTypeDataTypeConverter::toLogicalType)
Review comment:
Why not `DataType::getLogicalType`? Why do we even need the
`LogicalTypeDataTypeConverter::toLogicalType`?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
##########
@@ -78,4 +80,10 @@ public UnresolvedIdentifier parseIdentifier(String
identifier) {
SqlIdentifier sqlIdentifier =
parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ // do not support for old planner
+ return null;
Review comment:
Lets throw an exception instead.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String
identifier) {
SqlIdentifier sqlIdentifier =
parser.parseIdentifier(identifier);
return UnresolvedIdentifier.of(sqlIdentifier.names);
}
+
+ @Override
+ public ResolvedExpression parseSqlExpression(String sqlExpression,
TableSchema inputSchema) {
+ FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+ List<String> fieldNames =
Arrays.asList(inputSchema.getFieldNames());
+ List<LogicalType> fieldTypes =
Arrays.stream(inputSchema.getFieldDataTypes())
+
.map(LogicalTypeDataTypeConverter::toLogicalType)
+ .collect(Collectors.toList());
+ RelDataType inputType = typeFactory.buildRelNodeRowType(
+
JavaScalaConversionUtil$.MODULE$.toScala(fieldNames),
Review comment:
Could we rather add a method to the `FlinkTypeFactory` that could work
with Java? This hack really does not look nice.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##########
@@ -76,7 +77,9 @@ class TableSourceTest extends TableTestBase {
}
@Test
- def testProcTimeTableSourceSimple(): Unit = {
+ def testProctimeOnWatermarkSpec(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("proctime can't be defined on watermark spec.")
Review comment:
Shouldn't the message be rather: `Watermark can not be defined for a
processing time attribute column?`
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
}
}
+ public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver
schemaResolver) {
Review comment:
How about we pass it in the ctor? It is not an optional mutable
parameter.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result
type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is
Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+ private final Parser parser;
+ private final boolean isStreamingMode;
+
+ public CatalogTableSchemaResolver(Parser parser, boolean
isStreamingMode) {
+ this.parser = parser;
+ this.isStreamingMode = isStreamingMode;
+ }
+
+ /**
+ * Resolve the computed column's type for the given schema.
+ *
+ * @param tableSchema Table schema to derive table field names and data
types
+ * @return the resolved TableSchema
+ */
+ public TableSchema resolve(TableSchema tableSchema) {
+ final String rowtime;
+ if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+ // TODO: [FLINK-14473] we only support top-level
rowtime attribute right now
Review comment:
Not a comment for this issue/PR, but rather for the `WatermarkSpec`, but
this is very error prone imo to use a single string for a rowtime attribute if
it is supposed to handle nested columns.
Just as an example. This will break if the dot was escaped.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result
type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is
Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+ public final Parser parser;
+
+ public CatalogTableSchemaResolver(Parser parser) {
+ this.parser = parser;
+ }
+
+ /**
+ * Resolve the computed column's type for the given schema.
+ *
+ * @param tableSchema Table schema to derive table field names and data
types
+ * @param isStreamingMode Flag to determine whether the schema of a
stream or batch table is created
+ * @return the resolved TableSchema
+ */
+ public TableSchema resolve(TableSchema tableSchema, boolean
isStreamingMode) {
+ final String rowtime;
+ if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+ // TODO: [FLINK-14473] we only support top-level
rowtime attribute right now
+ rowtime =
tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+ if (rowtime.contains(".")) {
+ throw new ValidationException(
+ String.format("Nested field
'%s' as rowtime attribute is not supported right now.", rowtime));
+ }
+ } else {
+ rowtime = null;
+ }
+
+ String[] fieldNames = tableSchema.getFieldNames();
+ DataType[] fieldTypes =
Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+ for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+ TableColumn tableColumn =
tableSchema.getTableColumns().get(i);
+ if (tableColumn.isGenerated() &&
isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+ if (fieldNames[i].equals(rowtime)) {
+ throw new TableException("proctime
can't be defined on watermark spec.");
+ }
+ TimestampType originalType = (TimestampType)
fieldTypes[i].getLogicalType();
+ LogicalType proctimeType = new TimestampType(
+ originalType.isNullable(),
+ TimestampKind.PROCTIME,
+ originalType.getPrecision());
+ fieldTypes[i] =
TypeConversions.fromLogicalToDataType(proctimeType);
+ } else if (isStreamingMode &&
fieldNames[i].equals(rowtime)) {
Review comment:
Yes, that is also my thinking that a Table, including schema, should be
the same in both cases, and possibly handled differently in the planner.
Do you think we could create a JIRA ticket to somehow track this effort? If
we cannot remove it currently?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]