twalthr commented on a change in pull request #18611:
URL: https://github.com/apache/flink/pull/18611#discussion_r806925460



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CollectionToCollectionCastRule.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * This class provides default implementation for {@link #canFail(LogicalType, 
LogicalType)} for
+ * composite types, e.g. ARRAY to ARRAY (but not ARRAY to STRING).
+ */
+interface CollectionToCollectionCastRule<IN, OUT> extends CastRule<IN, OUT> {

Review comment:
       call this `ConstructedToConstructedCastRule` to match with the SQL 
terminology and `LogicalTypeFamily`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
##########
@@ -107,6 +108,16 @@ public static boolean exists(LogicalType inputType, 
LogicalType targetType) {
         return resolve(inputType, targetType) != null;
     }
 
+    /**
+     * Resolves the rule and returns the result of {@link 
CastRule#canFail(LogicalType,
+     * LogicalType)}. Fails with {@link NullPointerException} if the rule 
cannot be resolved.
+     */
+    public static boolean canFail(LogicalType inputType, LogicalType 
targetType) {
+        return Preconditions.checkNotNull(
+                        resolve(inputType, targetType), "Cast rule cannot be 
resolved")
+                .canFail(inputType, targetType);

Review comment:
       I can just repeat my previous feedback: if the resolution would store 
the input and target type in the CastRule instance we wouldn't need to pass the 
same arguments twice.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlCastFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
+
+/**
+ * This class implements the {@code TRY_CAST} built-in, essentially delegating 
all the method
+ * invocations, whenever is possible, to Calcite's {@link SqlCastFunction}.
+ */
+@Internal
+public class SqlTryCastFunction extends BuiltInSqlFunction {
+
+    /**
+     * Note that this constructor is mimicking as much as possible the 
constructor of Calcite's
+     * {@link SqlCastFunction}.
+     */
+    SqlTryCastFunction() {
+        super(
+                "TRY_CAST",
+                DEFAULT_VERSION,
+                SqlKind.OTHER_FUNCTION,
+                null,
+                SqlStdOperatorTable.CAST
+                        .getOperandTypeInference(), // From Calcite's 
SqlCastFunction
+                null,

Review comment:
       We should try to have a complete copy of `CAST`. E.g. `CAST` defines 
`InferTypes.FIRST_KNOWN` also the monotonicity property can be adopted. 

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/TryCastConverter.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import 
org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
+
+/**
+ * Conversion for {@link BuiltInFunctionDefinitions#TRY_CAST}.
+ *
+ * <p>We need this custom converter as {@link FunctionDefinitionConvertRule} 
doesn't support type
+ * literal arguments.
+ */
+@Internal
+class TryCastConverter extends CustomizedConverter {
+
+    @Override
+    public RexNode convert(CallExpression call, 
CallExpressionConvertRule.ConvertContext context) {
+        checkArgumentNumber(call, 2);
+
+        final RexNode child = context.toRexNode(call.getChildren().get(0));
+        final TypeLiteralExpression targetType = (TypeLiteralExpression) 
call.getChildren().get(1);
+
+        final LogicalType fromType = 
FlinkTypeFactory.toLogicalType(child.getType());
+        final LogicalType toType = 
targetType.getOutputDataType().getLogicalType();
+
+        // We need to adjust the type nullability here, as in table-common we 
cannot implement it
+        // correctly because we cannot access CastRuleProvider#canFail
+        RelDataType targetRelDataType =
+                
context.getTypeFactory().createFieldTypeFromLogicalType(toType);
+        if (CastRuleProvider.canFail(fromType, toType)) {

Review comment:
       both casts can share the same parameterized `CastConverter`

##########
File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
##########
@@ -1529,6 +1529,20 @@ public void testSetReset() {
         sql("RESET 'test-key'").ok("RESET 'test-key'");
     }
 
+    @Test
+    public void testTryCast() {
+        // Note that is expected that the unparsed value has the comma rather 
than AS, because we

Review comment:
       is this comment still valid? I don't see a comma in this test?

##########
File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
##########
@@ -1529,6 +1529,20 @@ public void testSetReset() {
         sql("RESET 'test-key'").ok("RESET 'test-key'");
     }
 
+    @Test
+    public void testTryCast() {
+        // Note that is expected that the unparsed value has the comma rather 
than AS, because we
+        // don't use a custom SqlNode for TryCast, but we rely on SqlBasicCall
+
+        // Simple types
+        expr("try_cast(a as timestamp)").ok("TRY_CAST(`A` AS TIMESTAMP)");
+        expr("try_cast('abc' as timestamp)").ok("TRY_CAST('abc' AS 
TIMESTAMP)");
+
+        // Complex types
+        expr("try_cast(a as row(f0 int, f1 varchar))")

Review comment:
       this is a type from Calcite, can we also try Flink complex types e.g.
   `TRY_CAST(f0 AS ROW<a ARRAY<INT>, b MAP<STRING, DECIMAL(10, 2)>>)`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTryCastFunction.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlCastFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
+
+/**
+ * This class implements the {@code TRY_CAST} built-in, essentially delegating 
all the method
+ * invocations, whenever is possible, to Calcite's {@link SqlCastFunction}.
+ */
+@Internal
+public class SqlTryCastFunction extends BuiltInSqlFunction {
+
+    /**
+     * Note that this constructor is mimicking as much as possible the 
constructor of Calcite's
+     * {@link SqlCastFunction}.
+     */
+    SqlTryCastFunction() {
+        super(
+                "TRY_CAST",
+                DEFAULT_VERSION,
+                SqlKind.OTHER_FUNCTION,
+                null,
+                SqlStdOperatorTable.CAST
+                        .getOperandTypeInference(), // From Calcite's 
SqlCastFunction
+                null,
+                SqlFunctionCategory.SYSTEM,
+                true,
+                false,
+                SqlStdOperatorTable.CAST::getMonotonicity);
+    }
+
+    @Override
+    public String getSignatureTemplate(final int operandsCount) {
+        return SqlStdOperatorTable.CAST.getSignatureTemplate(operandsCount);
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+        return SqlStdOperatorTable.CAST.getOperandCountRange();
+    }
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+        return SqlStdOperatorTable.CAST.checkOperandTypes(callBinding, 
throwOnFailure);
+    }
+
+    @Override
+    public SqlSyntax getSyntax() {
+        return SqlSyntax.SPECIAL;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int 
rightPrec) {
+        // Taken from SqlCastFunction, but using the name of this operator
+        assert call.operandCount() == 2;
+        final SqlWriter.Frame frame = writer.startFunCall(getName());
+        call.operand(0).unparse(writer, 0, 0);
+        writer.sep("AS");
+        if (call.operand(1) instanceof SqlIntervalQualifier) {
+            writer.sep("INTERVAL");
+        }
+        call.operand(1).unparse(writer, 0, 0);
+        writer.endFunCall(frame);
+    }
+
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+        RelDataType returnType = 
SqlStdOperatorTable.CAST.inferReturnType(opBinding);
+
+        final LogicalType fromLogicalType =

Review comment:
       as mentioned before, let's simplify the logic and don't switch to 
Flink's type system.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
##########
@@ -117,7 +117,7 @@ private FlinkAssertions() {}
      *                  .hasMessageContaining(containsMessage));
      * }</pre>
      */
-    public static ThrowingConsumer<? extends Throwable> anyCauseMatches(String 
containsMessage) {
+    public static ThrowingConsumer<? super Throwable> anyCauseMatches(String 
containsMessage) {

Review comment:
       put this into a separate commit, because it is a core change

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql2rel.SqlRexContext;
+import org.apache.calcite.sql2rel.SqlRexConvertlet;
+import org.apache.calcite.sql2rel.SqlRexConvertletTable;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+
+import java.util.Collections;
+
+/**
+ * Custom Flink {@link SqlRexConvertletTable} to add custom {@link SqlNode} to 
{@link RexNode}
+ * conversions.
+ */
+@Internal
+public class FlinkConvertletTable implements SqlRexConvertletTable {
+
+    public static final FlinkConvertletTable INSTANCE = new 
FlinkConvertletTable();
+
+    private FlinkConvertletTable() {}
+
+    @Override
+    public SqlRexConvertlet get(SqlCall call) {
+        if (call.getOperator().isName("TRY_CAST", false)) {
+            return this::convertTryCast;
+        }
+        return StandardConvertletTable.INSTANCE.get(call);
+    }
+
+    // Slightly modified version of StandardConvertletTable::convertCast
+    private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) {
+        RelDataTypeFactory typeFactory = cx.getTypeFactory();
+        final SqlNode leftNode = call.operand(0);
+        final SqlNode rightNode = call.operand(1);
+
+        final RexNode valueRex = cx.convertExpression(leftNode);
+
+        RelDataType type;
+        if (rightNode instanceof SqlIntervalQualifier) {
+            type = typeFactory.createSqlIntervalType((SqlIntervalQualifier) 
rightNode);
+        } else if (rightNode instanceof SqlDataTypeSpec) {
+            SqlDataTypeSpec dataType = ((SqlDataTypeSpec) rightNode);
+            type = dataType.deriveType(cx.getValidator());
+            if (type == null) {
+                type = 
cx.getValidator().getValidatedNodeType(dataType.getTypeName());
+            }
+        } else {
+            throw new IllegalStateException(
+                    "Invalid right argument type for TRY_CAST: " + rightNode);
+        }
+
+        final LogicalType fromLogicalType = 
FlinkTypeFactory.toLogicalType(valueRex.getType());

Review comment:
       Let's simplify the logic in the stack. It is ok if `TRY_CAST` always 
returns NULLABLE. Users can simply use `CAST` if they don't expect an error. It 
also makes sure that their is no mismatch between Table API type inference and 
SQL.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
##########
@@ -469,18 +469,28 @@ public OutType collect() {
     }
 
     /**
-     * Converts a value to a given data type.
+     * Returns a new value being cast to {@code toType}. A cast error throws 
an exception and fails
+     * the job. If you're performing a cast operation that may fail, like 
{@link DataTypes#INT()} to
+     * {@link DataTypes#STRING()}, you should rather use {@link 
#tryCast(DataType)}, in order to
+     * handle errors. If {@code table.exec.sink.legacy-cast-behaviour} is 
enabled, this function

Review comment:
       link to config option directly

##########
File path: docs/data/sql_functions.yml
##########
@@ -561,7 +561,10 @@ conditional:
 conversion:
   - sql: CAST(value AS type)
     table: ANY.cast(TYPE)
-    description: Returns a new value being cast to type type. E.g., CAST('42' 
AS INT) returns 42; CAST(NULL AS VARCHAR) returns NULL of type VARCHAR.
+    description: Returns a new value being cast to type type. A CAST error 
throws an exception and fails the job. If you're performing a cast operation 
that may fail, like INT to STRING, you should rather use TRY_CAST, in order to 
handle errors. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS VARCHAR) 
returns NULL of type VARCHAR; TRY_CAST('non-number' AS INT) throws an exception 
and fails the job.

Review comment:
       remove `TRY_`

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1240,17 +1240,17 @@ private CastTestSpecBuilder fromCase(DataType dataType, 
Object src, Object targe
             return this;
         }
 
-        private CastTestSpecBuilder failTableApi(DataType dataType, Object 
src) {
-            return fail(TestType.ERROR_TABLE_API, dataType, src);
+        private CastTestSpecBuilder failTableApiValidation(DataType dataType, 
Object src) {
+            return failValidation(TestType.ERROR_TABLE_API, dataType, src);
         }
 
-        private CastTestSpecBuilder failSQL(DataType dataType, Object src) {
-            return fail(TestType.ERROR_TABLE_API, dataType, src);
+        private CastTestSpecBuilder failSQLValidation(DataType dataType, 
Object src) {

Review comment:
       use `failSqlValidation` camel case

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToRowCastRule.java
##########
@@ -224,4 +224,13 @@ protected String generateCodeBlockInternal(
         writer.stmt(methodCall(writerTerm, 
"complete")).assignStmt(returnVariable, rowTerm);
         return writer.toString();
     }
+
+    @Override
+    public boolean canFail(LogicalType inputLogicalType, LogicalType 
targetLogicalType) {

Review comment:
       implement `CollectionToCollectionCastRule` here as well?

##########
File path: flink-python/pyflink/table/expression.py
##########
@@ -837,12 +837,28 @@ def alias(self, name: str, *extra_names: str) -> 
'Expression[T]':
 
     def cast(self, data_type: DataType) -> 'Expression':
         """
-        Converts a value to a given data type.
+        Returns a new value being cast to type type.
+        A cast error throws an exception and fails the job.
+        If you're performing a cast operation that may fail, like INT to 
STRING,

Review comment:
       for the future: use `one` instead of `you`, or no person like: `When 
performing a cast operation that may fail, use...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to