twalthr commented on a change in pull request #18611: URL: https://github.com/apache/flink/pull/18611#discussion_r806925460
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CollectionToCollectionCastRule.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.casting; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * This class provides default implementation for {@link #canFail(LogicalType, LogicalType)} for + * composite types, e.g. ARRAY to ARRAY (but not ARRAY to STRING). + */ +interface CollectionToCollectionCastRule<IN, OUT> extends CastRule<IN, OUT> { Review comment: call this `ConstructedToConstructedCastRule` to match with the SQL terminology and `LogicalTypeFamily` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java ########## @@ -107,6 +108,16 @@ public static boolean exists(LogicalType inputType, LogicalType targetType) { return resolve(inputType, targetType) != null; } + /** + * Resolves the rule and returns the result of {@link CastRule#canFail(LogicalType, + * LogicalType)}. Fails with {@link NullPointerException} if the rule cannot be resolved. + */ + public static boolean canFail(LogicalType inputType, LogicalType targetType) { + return Preconditions.checkNotNull( + resolve(inputType, targetType), "Cast rule cannot be resolved") + .canFail(inputType, targetType); Review comment: I can just repeat my previous feedback: if the resolution would store the input and target type in the CastRule instance we wouldn't need to pass the same arguments twice. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperandCountRange; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.fun.SqlCastFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION; + +/** + * This class implements the {@code TRY_CAST} built-in, essentially delegating all the method + * invocations, whenever is possible, to Calcite's {@link SqlCastFunction}. + */ +@Internal +public class SqlTryCastFunction extends BuiltInSqlFunction { + + /** + * Note that this constructor is mimicking as much as possible the constructor of Calcite's + * {@link SqlCastFunction}. + */ + SqlTryCastFunction() { + super( + "TRY_CAST", + DEFAULT_VERSION, + SqlKind.OTHER_FUNCTION, + null, + SqlStdOperatorTable.CAST + .getOperandTypeInference(), // From Calcite's SqlCastFunction + null, Review comment: We should try to have a complete copy of `CAST`. E.g. `CAST` defines `InferTypes.FIRST_KNOWN` also the monotonicity property can be adopted. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/TryCastConverter.java ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter.converters; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.TypeLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule; +import org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; + +/** + * Conversion for {@link BuiltInFunctionDefinitions#TRY_CAST}. + * + * <p>We need this custom converter as {@link FunctionDefinitionConvertRule} doesn't support type + * literal arguments. + */ +@Internal +class TryCastConverter extends CustomizedConverter { + + @Override + public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) { + checkArgumentNumber(call, 2); + + final RexNode child = context.toRexNode(call.getChildren().get(0)); + final TypeLiteralExpression targetType = (TypeLiteralExpression) call.getChildren().get(1); + + final LogicalType fromType = FlinkTypeFactory.toLogicalType(child.getType()); + final LogicalType toType = targetType.getOutputDataType().getLogicalType(); + + // We need to adjust the type nullability here, as in table-common we cannot implement it + // correctly because we cannot access CastRuleProvider#canFail + RelDataType targetRelDataType = + context.getTypeFactory().createFieldTypeFromLogicalType(toType); + if (CastRuleProvider.canFail(fromType, toType)) { Review comment: both casts can share the same parameterized `CastConverter` ########## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ########## @@ -1529,6 +1529,20 @@ public void testSetReset() { sql("RESET 'test-key'").ok("RESET 'test-key'"); } + @Test + public void testTryCast() { + // Note that is expected that the unparsed value has the comma rather than AS, because we Review comment: is this comment still valid? I don't see a comma in this test? ########## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ########## @@ -1529,6 +1529,20 @@ public void testSetReset() { sql("RESET 'test-key'").ok("RESET 'test-key'"); } + @Test + public void testTryCast() { + // Note that is expected that the unparsed value has the comma rather than AS, because we + // don't use a custom SqlNode for TryCast, but we rely on SqlBasicCall + + // Simple types + expr("try_cast(a as timestamp)").ok("TRY_CAST(`A` AS TIMESTAMP)"); + expr("try_cast('abc' as timestamp)").ok("TRY_CAST('abc' AS TIMESTAMP)"); + + // Complex types + expr("try_cast(a as row(f0 int, f1 varchar))") Review comment: this is a type from Calcite, can we also try Flink complex types e.g. `TRY_CAST(f0 AS ROW<a ARRAY<INT>, b MAP<STRING, DECIMAL(10, 2)>>)` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperandCountRange; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.fun.SqlCastFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; + +import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION; + +/** + * This class implements the {@code TRY_CAST} built-in, essentially delegating all the method + * invocations, whenever is possible, to Calcite's {@link SqlCastFunction}. + */ +@Internal +public class SqlTryCastFunction extends BuiltInSqlFunction { + + /** + * Note that this constructor is mimicking as much as possible the constructor of Calcite's + * {@link SqlCastFunction}. + */ + SqlTryCastFunction() { + super( + "TRY_CAST", + DEFAULT_VERSION, + SqlKind.OTHER_FUNCTION, + null, + SqlStdOperatorTable.CAST + .getOperandTypeInference(), // From Calcite's SqlCastFunction + null, + SqlFunctionCategory.SYSTEM, + true, + false, + SqlStdOperatorTable.CAST::getMonotonicity); + } + + @Override + public String getSignatureTemplate(final int operandsCount) { + return SqlStdOperatorTable.CAST.getSignatureTemplate(operandsCount); + } + + @Override + public SqlOperandCountRange getOperandCountRange() { + return SqlStdOperatorTable.CAST.getOperandCountRange(); + } + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { + return SqlStdOperatorTable.CAST.checkOperandTypes(callBinding, throwOnFailure); + } + + @Override + public SqlSyntax getSyntax() { + return SqlSyntax.SPECIAL; + } + + @Override + public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + // Taken from SqlCastFunction, but using the name of this operator + assert call.operandCount() == 2; + final SqlWriter.Frame frame = writer.startFunCall(getName()); + call.operand(0).unparse(writer, 0, 0); + writer.sep("AS"); + if (call.operand(1) instanceof SqlIntervalQualifier) { + writer.sep("INTERVAL"); + } + call.operand(1).unparse(writer, 0, 0); + writer.endFunCall(frame); + } + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + RelDataType returnType = SqlStdOperatorTable.CAST.inferReturnType(opBinding); + + final LogicalType fromLogicalType = Review comment: as mentioned before, let's simplify the logic and don't switch to Flink's type system. ########## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java ########## @@ -117,7 +117,7 @@ private FlinkAssertions() {} * .hasMessageContaining(containsMessage)); * }</pre> */ - public static ThrowingConsumer<? extends Throwable> anyCauseMatches(String containsMessage) { + public static ThrowingConsumer<? super Throwable> anyCauseMatches(String containsMessage) { Review comment: put this into a separate commit, because it is a core change ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.functions.casting.CastRuleProvider; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.validate.SqlValidatorImpl; +import org.apache.calcite.sql2rel.SqlRexContext; +import org.apache.calcite.sql2rel.SqlRexConvertlet; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.StandardConvertletTable; + +import java.util.Collections; + +/** + * Custom Flink {@link SqlRexConvertletTable} to add custom {@link SqlNode} to {@link RexNode} + * conversions. + */ +@Internal +public class FlinkConvertletTable implements SqlRexConvertletTable { + + public static final FlinkConvertletTable INSTANCE = new FlinkConvertletTable(); + + private FlinkConvertletTable() {} + + @Override + public SqlRexConvertlet get(SqlCall call) { + if (call.getOperator().isName("TRY_CAST", false)) { + return this::convertTryCast; + } + return StandardConvertletTable.INSTANCE.get(call); + } + + // Slightly modified version of StandardConvertletTable::convertCast + private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { + RelDataTypeFactory typeFactory = cx.getTypeFactory(); + final SqlNode leftNode = call.operand(0); + final SqlNode rightNode = call.operand(1); + + final RexNode valueRex = cx.convertExpression(leftNode); + + RelDataType type; + if (rightNode instanceof SqlIntervalQualifier) { + type = typeFactory.createSqlIntervalType((SqlIntervalQualifier) rightNode); + } else if (rightNode instanceof SqlDataTypeSpec) { + SqlDataTypeSpec dataType = ((SqlDataTypeSpec) rightNode); + type = dataType.deriveType(cx.getValidator()); + if (type == null) { + type = cx.getValidator().getValidatedNodeType(dataType.getTypeName()); + } + } else { + throw new IllegalStateException( + "Invalid right argument type for TRY_CAST: " + rightNode); + } + + final LogicalType fromLogicalType = FlinkTypeFactory.toLogicalType(valueRex.getType()); Review comment: Let's simplify the logic in the stack. It is ok if `TRY_CAST` always returns NULLABLE. Users can simply use `CAST` if they don't expect an error. It also makes sure that their is no mismatch between Table API type inference and SQL. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java ########## @@ -469,18 +469,28 @@ public OutType collect() { } /** - * Converts a value to a given data type. + * Returns a new value being cast to {@code toType}. A cast error throws an exception and fails + * the job. If you're performing a cast operation that may fail, like {@link DataTypes#INT()} to + * {@link DataTypes#STRING()}, you should rather use {@link #tryCast(DataType)}, in order to + * handle errors. If {@code table.exec.sink.legacy-cast-behaviour} is enabled, this function Review comment: link to config option directly ########## File path: docs/data/sql_functions.yml ########## @@ -561,7 +561,10 @@ conditional: conversion: - sql: CAST(value AS type) table: ANY.cast(TYPE) - description: Returns a new value being cast to type type. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR. + description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. If you're performing a cast operation that may fail, like INT to STRING, you should rather use TRY_CAST, in order to handle errors. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR; TRY_CAST('non-number' AS INT) throws an exception and fails the job. Review comment: remove `TRY_` ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java ########## @@ -1240,17 +1240,17 @@ private CastTestSpecBuilder fromCase(DataType dataType, Object src, Object targe return this; } - private CastTestSpecBuilder failTableApi(DataType dataType, Object src) { - return fail(TestType.ERROR_TABLE_API, dataType, src); + private CastTestSpecBuilder failTableApiValidation(DataType dataType, Object src) { + return failValidation(TestType.ERROR_TABLE_API, dataType, src); } - private CastTestSpecBuilder failSQL(DataType dataType, Object src) { - return fail(TestType.ERROR_TABLE_API, dataType, src); + private CastTestSpecBuilder failSQLValidation(DataType dataType, Object src) { Review comment: use `failSqlValidation` camel case ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToRowCastRule.java ########## @@ -224,4 +224,13 @@ protected String generateCodeBlockInternal( writer.stmt(methodCall(writerTerm, "complete")).assignStmt(returnVariable, rowTerm); return writer.toString(); } + + @Override + public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) { Review comment: implement `CollectionToCollectionCastRule` here as well? ########## File path: flink-python/pyflink/table/expression.py ########## @@ -837,12 +837,28 @@ def alias(self, name: str, *extra_names: str) -> 'Expression[T]': def cast(self, data_type: DataType) -> 'Expression': """ - Converts a value to a given data type. + Returns a new value being cast to type type. + A cast error throws an exception and fails the job. + If you're performing a cast operation that may fail, like INT to STRING, Review comment: for the future: use `one` instead of `you`, or no person like: `When performing a cast operation that may fail, use...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
