This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new f621ab665cc [FLINK-38682][table-planner] Support unknown -> RAW cast
during type inference in limited validator scope
f621ab665cc is described below
commit f621ab665cc8c93bd95084750361a235aad2c4ea
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Nov 17 12:41:20 2025 +0100
[FLINK-38682][table-planner] Support unknown -> RAW cast during type
inference in limited validator scope
---
.../inference/TypeInferenceOperandChecker.java | 32 +++++++++--
.../planner/runtime/batch/sql/FunctionITCase.java | 64 +++++++++++++++++++++-
2 files changed, 91 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
index 467e2178ccc..e20c2c07548 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
@@ -19,10 +19,12 @@
package org.apache.flink.table.planner.functions.inference;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.type.SqlRawTypeNameSpec;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
@@ -32,16 +34,20 @@ import
org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlTypeNameSpec;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -240,10 +246,28 @@ public final class TypeInferenceOperandChecker
/** Adopted from {@link
org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */
private SqlNode castTo(SqlNode node, RelDataType type) {
- return SqlStdOperatorTable.CAST.createCall(
- SqlParserPos.ZERO,
- node,
-
SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable()));
+ final SqlDataTypeSpec dataType;
+ if (type instanceof RawRelDataType) {
+ dataType = createRawDataTypeSpec((RawRelDataType) type);
+ } else {
+ dataType =
SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable());
+ }
+
+ return SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node,
dataType);
+ }
+
+ private SqlDataTypeSpec createRawDataTypeSpec(RawRelDataType type) {
+ final RawType<?> rawType = type.getRawType();
+
+ SqlNode className =
+ SqlLiteral.createCharString(
+ rawType.getOriginatingClass().getName(),
SqlParserPos.ZERO);
+ SqlNode serializer =
+ SqlLiteral.createCharString(rawType.getSerializerString(),
SqlParserPos.ZERO);
+
+ SqlTypeNameSpec rawSpec = new SqlRawTypeNameSpec(className,
serializer, SqlParserPos.ZERO);
+
+ return new SqlDataTypeSpec(rawSpec, null, type.isNullable(),
SqlParserPos.ZERO);
}
/** Adopted from {@link
org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
index e07ec1372da..a85cf50c50c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.planner.runtime.batch.sql;
+import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.functions.ScalarFunction;
import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.types.Row;
@@ -99,6 +101,49 @@ class FunctionITCase extends BatchTestBase {
testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
}
+ @Test
+ void testOrderByScopeRawTypeCast() throws Exception {
+ final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3),
Row.of(4), Row.of(5));
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' =
'COLLECTION')");
+ tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' =
'COLLECTION')");
+
+ tEnv().createTemporarySystemFunction("CustomIntUdf", new
CustomIntUdf());
+
+ tEnv().executeSql(
+ "INSERT INTO Sink"
+ + " SELECT i FROM Source"
+ + " ORDER BY CustomIntUdf(NULL)")
+ .await();
+
+ assertThat(TestCollectionTableFactory.getResult()).hasSize(5);
+ }
+
+ @Test
+ void testHavingScopeRawTypeCast() throws Exception {
+ final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3),
Row.of(4), Row.of(5));
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' =
'COLLECTION')");
+ tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' =
'COLLECTION')");
+
+ tEnv().createTemporarySystemFunction("CustomIntUdf", new
CustomIntUdf());
+
+ tEnv().executeSql(
+ "INSERT INTO Sink"
+ + " SELECT SUM(i) AS s FROM Source"
+ + " HAVING CustomIntUdf(NULL) = 0")
+ .await();
+
+ assertThat(TestCollectionTableFactory.getResult())
+ .singleElement()
+ .asString()
+ .contains("15");
+ }
+
private void testUserDefinedFunctionByUsingJar(String createFunctionDDL,
String dropFunctionDDL)
throws Exception {
List<Row> sourceData =
@@ -123,7 +168,7 @@ class FunctionITCase extends BatchTestBase {
Table t2 = tEnv().sqlQuery(query);
t2.executeInsert("t2").await();
- List<Row> result = TestCollectionTableFactory.RESULT();
+ List<Row> result = TestCollectionTableFactory.getResult();
List<Row> expected =
Arrays.asList(
Row.of(1, "jark"),
@@ -139,4 +184,21 @@ class FunctionITCase extends BatchTestBase {
// delete the function
tEnv().executeSql(dropFunctionDDL);
}
+
+ // ----- Test types / UDF -----
+
+ @DataTypeHint(value = "RAW", bridgedTo = CustomInt.class)
+ public static class CustomInt {
+ public Integer value;
+
+ public CustomInt(Integer v) {
+ this.value = v;
+ }
+ }
+
+ public static class CustomIntUdf extends ScalarFunction {
+ public Integer eval(CustomInt v) {
+ return 0;
+ }
+ }
}