This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ad229f9b65e [FLINK-38964][table] Reuse Calcite's 
`SqlValidatorImpl#maybeCast`
ad229f9b65e is described below

commit ad229f9b65e423559cdede78005f309cc4ad5b31
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jan 26 23:34:46 2026 +0100

    [FLINK-38964][table] Reuse Calcite's `SqlValidatorImpl#maybeCast`
---
 .../calcite/sql/validate/SqlValidatorImpl.java     |  2 +-
 .../planner/calcite/FlinkCalciteSqlValidator.java  |  5 +++
 .../planner/operations/SqlNodeConvertContext.java  |  4 +--
 .../operations/converters/MergeTableAsUtil.java    |  9 +++--
 .../operations/converters/SqlNodeConverter.java    |  3 +-
 .../planner/calcite/PreValidateReWriter.scala      | 20 +++--------
 .../table/planner/calcite/SqlRewriterUtils.scala   | 39 ++--------------------
 7 files changed, 21 insertions(+), 61 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 57dc5e740f1..6635637888a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -776,7 +776,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         return 0;
     }
 
-    private SqlNode maybeCast(SqlNode node, RelDataType currentType, 
RelDataType desiredType) {
+    protected SqlNode maybeCast(SqlNode node, RelDataType currentType, 
RelDataType desiredType) {
         return SqlTypeUtil.equalSansNullability(typeFactory, currentType, 
desiredType)
                 ? node
                 : SqlStdOperatorTable.CAST.createCall(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 648c951ebef..a65cb882a43 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -431,6 +431,11 @@ public final class FlinkCalciteSqlValidator extends 
FlinkSqlParsingValidator {
         return rewritten;
     }
 
+    @Override
+    public SqlNode maybeCast(SqlNode node, RelDataType currentType, 
RelDataType desiredType) {
+        return super.maybeCast(node, currentType, desiredType);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Column expansion
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 687a8ce483a..05364388fba 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
@@ -36,7 +37,6 @@ import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.dialect.AnsiSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.validate.SqlValidator;
 
 import javax.annotation.Nullable;
 
@@ -63,7 +63,7 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
     }
 
     @Override
-    public SqlValidator getSqlValidator() {
+    public FlinkCalciteSqlValidator getSqlValidator() {
         return flinkPlanner.getOrCreateSqlValidator();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
index 3264e3c392a..2b26585d6f4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
@@ -67,12 +67,12 @@ import java.util.stream.IntStream;
 
 /** A utility class with logic for handling the {@code CREATE TABLE ... AS 
SELECT} clause. */
 public class MergeTableAsUtil {
-    private final SqlValidator validator;
+    private final FlinkCalciteSqlValidator validator;
     private final Function<SqlNode, String> escapeExpression;
     private final DataTypeFactory dataTypeFactory;
 
     public MergeTableAsUtil(
-            SqlValidator validator,
+            FlinkCalciteSqlValidator validator,
             Function<SqlNode, String> escapeExpression,
             DataTypeFactory dataTypeFactory) {
         this.validator = validator;
@@ -135,11 +135,10 @@ public class MergeTableAsUtil {
 
                 assignedFields.put(
                         pos,
-                        rewriterUtils.maybeCast(
+                        validator.maybeCast(
                                 SqlLiteral.createNull(SqlParserPos.ZERO),
                                 typeFactory.createUnknownType(),
-                                
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
-                                typeFactory));
+                                
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
             } else {
                 targetPositions.add(sourceFields.get(targetField.getName()));
             }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index 2c54d78c516..23f8f1716b2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.types.DataType;
@@ -79,7 +80,7 @@ public interface SqlNodeConverter<S extends SqlNode> {
         TableConfig getTableConfig();
 
         /** Returns the {@link SqlValidator} in the convert context. */
-        SqlValidator getSqlValidator();
+        FlinkCalciteSqlValidator getSqlValidator();
 
         /** Returns the {@link CatalogManager} in the convert context. */
         CatalogManager getCatalogManager();
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 8be16db924f..b6e61385d36 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -17,30 +17,24 @@
  */
 package org.apache.flink.table.planner.calcite
 
-import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
 import org.apache.flink.sql.parser.SqlProperty
 import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.sql.parser.dql.SqlRichExplain
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
-import org.apache.flink.util.Preconditions.checkArgument
 
 import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, 
RelDataTypeField}
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, 
SqlUtil}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, 
SqlNodeList, SqlTableRef, SqlUtil}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
 import org.apache.calcite.sql.validate.{SqlValidatorException, 
SqlValidatorTable, SqlValidatorUtil}
 import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
-import java.util.Collections
 
 import scala.collection.JavaConversions._
 
@@ -153,11 +147,7 @@ object PreValidateReWriter {
       val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
       assignedFields.put(
         targetField.getIndex,
-        rewriterUtils.maybeCast(
-          value,
-          value.createSqlType(typeFactory),
-          targetField.getType,
-          typeFactory))
+        validator.maybeCast(value, value.createSqlType(typeFactory), 
targetField.getType))
     }
 
     // validate partial insert columns.
@@ -205,11 +195,11 @@ object PreValidateReWriter {
             validateField(idx => !assignedFields.contains(idx), id, 
targetField)
             assignedFields.put(
               targetField.getIndex,
-              rewriterUtils.maybeCast(
+              validator.maybeCast(
                 SqlLiteral.createNull(SqlParserPos.ZERO),
                 typeFactory.createUnknownType(),
-                targetField.getType,
-                typeFactory)
+                targetField.getType
+              )
             )
           } else {
             // handle reorder
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
index 9bcc9361f14..7326268e75e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala
@@ -17,18 +17,15 @@
  */
 package org.apache.flink.table.planner.calcite
 
-import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.ExplicitTableSqlSelect
 import 
org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, 
rewriteSqlSelect, rewriteSqlValues, rewriteSqlWith}
 import org.apache.flink.util.Preconditions.checkArgument
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlDataTypeSpec, 
SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, 
SqlWith}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlNode, 
SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.SqlValidatorException
 import org.apache.calcite.util.Static.RESOURCE
 
@@ -79,38 +76,6 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
       targetPosition,
       unsupportedErrorMessage)
   }
-
-  // This code snippet is copied from the SqlValidatorImpl.
-  def maybeCast(
-      node: SqlNode,
-      currentType: RelDataType,
-      desiredType: RelDataType,
-      typeFactory: RelDataTypeFactory): SqlNode = {
-    if (
-      currentType == desiredType
-      || (currentType.isNullable != desiredType.isNullable
-        && typeFactory.createTypeWithNullability(currentType, 
desiredType.isNullable)
-        == desiredType)
-    ) {
-      node
-    } else {
-      // See FLINK-26460 for more details
-      val sqlDataTypeSpec =
-        if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType)) 
{
-          val keyType = desiredType.getKeyType
-          val valueType = desiredType.getValueType
-          new SqlDataTypeSpec(
-            new SqlMapTypeNameSpec(
-              
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
-              
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
-              SqlParserPos.ZERO),
-            SqlParserPos.ZERO)
-        } else {
-          SqlTypeUtil.convertTypeToSpec(desiredType)
-        }
-      SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, 
sqlDataTypeSpec)
-    }
-  }
 }
 
 object SqlRewriterUtils {

Reply via email to