This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ad229f9b65e [FLINK-38964][table] Reuse Calcite's
`SqlValidatorImpl#maybeCast`
ad229f9b65e is described below
commit ad229f9b65e423559cdede78005f309cc4ad5b31
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jan 26 23:34:46 2026 +0100
[FLINK-38964][table] Reuse Calcite's `SqlValidatorImpl#maybeCast`
---
.../calcite/sql/validate/SqlValidatorImpl.java | 2 +-
.../planner/calcite/FlinkCalciteSqlValidator.java | 5 +++
.../planner/operations/SqlNodeConvertContext.java | 4 +--
.../operations/converters/MergeTableAsUtil.java | 9 +++--
.../operations/converters/SqlNodeConverter.java | 3 +-
.../planner/calcite/PreValidateReWriter.scala | 20 +++--------
.../table/planner/calcite/SqlRewriterUtils.scala | 39 ++--------------------
7 files changed, 21 insertions(+), 61 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 57dc5e740f1..6635637888a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -776,7 +776,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
return 0;
}
- private SqlNode maybeCast(SqlNode node, RelDataType currentType,
RelDataType desiredType) {
+ protected SqlNode maybeCast(SqlNode node, RelDataType currentType,
RelDataType desiredType) {
return SqlTypeUtil.equalSansNullability(typeFactory, currentType,
desiredType)
? node
: SqlStdOperatorTable.CAST.createCall(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 648c951ebef..a65cb882a43 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -431,6 +431,11 @@ public final class FlinkCalciteSqlValidator extends
FlinkSqlParsingValidator {
return rewritten;
}
+ @Override
+ public SqlNode maybeCast(SqlNode node, RelDataType currentType,
RelDataType desiredType) {
+ return super.maybeCast(node, currentType, desiredType);
+ }
+
//
--------------------------------------------------------------------------------------------
// Column expansion
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 687a8ce483a..05364388fba 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
@@ -36,7 +37,6 @@ import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.validate.SqlValidator;
import javax.annotation.Nullable;
@@ -63,7 +63,7 @@ public class SqlNodeConvertContext implements
SqlNodeConverter.ConvertContext {
}
@Override
- public SqlValidator getSqlValidator() {
+ public FlinkCalciteSqlValidator getSqlValidator() {
return flinkPlanner.getOrCreateSqlValidator();
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
index 3264e3c392a..2b26585d6f4 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
@@ -67,12 +67,12 @@ import java.util.stream.IntStream;
/** A utility class with logic for handling the {@code CREATE TABLE ... AS
SELECT} clause. */
public class MergeTableAsUtil {
- private final SqlValidator validator;
+ private final FlinkCalciteSqlValidator validator;
private final Function<SqlNode, String> escapeExpression;
private final DataTypeFactory dataTypeFactory;
public MergeTableAsUtil(
- SqlValidator validator,
+ FlinkCalciteSqlValidator validator,
Function<SqlNode, String> escapeExpression,
DataTypeFactory dataTypeFactory) {
this.validator = validator;
@@ -135,11 +135,10 @@ public class MergeTableAsUtil {
assignedFields.put(
pos,
- rewriterUtils.maybeCast(
+ validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
-
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
- typeFactory));
+
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
} else {
targetPositions.add(sourceFields.get(targetField.getName()));
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index 2c54d78c516..23f8f1716b2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.types.DataType;
@@ -79,7 +80,7 @@ public interface SqlNodeConverter<S extends SqlNode> {
TableConfig getTableConfig();
/** Returns the {@link SqlValidator} in the convert context. */
- SqlValidator getSqlValidator();
+ FlinkCalciteSqlValidator getSqlValidator();
/** Returns the {@link CatalogManager} in the convert context. */
CatalogManager getCatalogManager();
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 8be16db924f..b6e61385d36 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -17,30 +17,24 @@
*/
package org.apache.flink.table.planner.calcite
-import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.sql.parser.dql.SqlRichExplain
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
-import org.apache.flink.util.Preconditions.checkArgument
import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory,
RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier,
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef,
SqlUtil}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode,
SqlNodeList, SqlTableRef, SqlUtil}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
-import java.util.Collections
import scala.collection.JavaConversions._
@@ -153,11 +147,7 @@ object PreValidateReWriter {
val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
assignedFields.put(
targetField.getIndex,
- rewriterUtils.maybeCast(
- value,
- value.createSqlType(typeFactory),
- targetField.getType,
- typeFactory))
+ validator.maybeCast(value, value.createSqlType(typeFactory),
targetField.getType))
}
// validate partial insert columns.
@@ -205,11 +195,11 @@ object PreValidateReWriter {
validateField(idx => !assignedFields.contains(idx), id,
targetField)
assignedFields.put(
targetField.getIndex,
- rewriterUtils.maybeCast(
+ validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
- targetField.getType,
- typeFactory)
+ targetField.getType
+ )
)
} else {
// handle reorder
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
index 9bcc9361f14..7326268e75e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
@@ -17,18 +17,15 @@
*/
package org.apache.flink.table.planner.calcite
-import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.ExplicitTableSqlSelect
import
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall,
rewriteSqlSelect, rewriteSqlValues, rewriteSqlWith}
import org.apache.flink.util.Preconditions.checkArgument
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlDataTypeSpec,
SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil,
SqlWith}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlNode,
SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.validate.SqlValidatorException
import org.apache.calcite.util.Static.RESOURCE
@@ -79,38 +76,6 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
targetPosition,
unsupportedErrorMessage)
}
-
- // This code snippet is copied from the SqlValidatorImpl.
- def maybeCast(
- node: SqlNode,
- currentType: RelDataType,
- desiredType: RelDataType,
- typeFactory: RelDataTypeFactory): SqlNode = {
- if (
- currentType == desiredType
- || (currentType.isNullable != desiredType.isNullable
- && typeFactory.createTypeWithNullability(currentType,
desiredType.isNullable)
- == desiredType)
- ) {
- node
- } else {
- // See FLINK-26460 for more details
- val sqlDataTypeSpec =
- if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType))
{
- val keyType = desiredType.getKeyType
- val valueType = desiredType.getValueType
- new SqlDataTypeSpec(
- new SqlMapTypeNameSpec(
-
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
-
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
- SqlParserPos.ZERO),
- SqlParserPos.ZERO)
- } else {
- SqlTypeUtil.convertTypeToSpec(desiredType)
- }
- SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node,
sqlDataTypeSpec)
- }
- }
}
object SqlRewriterUtils {