This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb6c61f373 [FLINK-37850][table] Several `UNION`, `EXCEPT`, 
`INTERSECT` with one wrong column type fail with `UnsupportedOperationException`
bcb6c61f373 is described below

commit bcb6c61f373c9ac2b507cd1ac629b5e27a7a547e
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue May 27 12:22:10 2025 +0200

    [FLINK-37850][table] Several `UNION`, `EXCEPT`, `INTERSECT` with one wrong 
column type fail with `UnsupportedOperationException`
---
 .../main/java/org/apache/calcite/sql/SqlUtil.java  | 1312 ++++++++++++++++++++
 .../table/planner/plan/stream/sql/UnionTest.scala  |   17 +
 2 files changed, 1329 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlUtil.java
new file mode 100644
index 00000000000..814cf606650
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlUtil.java
@@ -0,0 +1,1312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import com.google.common.base.Predicates;
+import com.google.common.base.Utf8;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.function.Functions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypePrecedenceList;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.runtime.Resources;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.BarfingInvocationHandler;
+import org.apache.calcite.util.ConversionUtil;
+import org.apache.calcite.util.Glossary;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.PolyNull;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.charset.UnsupportedCharsetException;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/**
+ * Flink modification because of CALCITE-7027
+ *
+ * <p>Lines 825 ~ 831, should be removed after upgrading to 1.40.
+ */
+public abstract class SqlUtil {
+    // ~ Constants 
--------------------------------------------------------------
+
+    /**
+     * Prefix for generated column aliases. Ends with '$' so that 
human-written queries are unlikely
+     * to accidentally reference the generated name.
+     */
+    public static final String GENERATED_EXPR_ALIAS_PREFIX = "EXPR$";
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    /**
+     * Returns the AND of two expressions.
+     *
+     * <p>If {@code node1} is null, returns {@code node2}. Flattens if either 
node is an AND.
+     */
+    public static SqlNode andExpressions(@Nullable SqlNode node1, SqlNode 
node2) {
+        if (node1 == null) {
+            return node2;
+        }
+        ArrayList<SqlNode> list = new ArrayList<>();
+        if (node1.getKind() == SqlKind.AND) {
+            list.addAll(((SqlCall) node1).getOperandList());
+        } else {
+            list.add(node1);
+        }
+        if (node2.getKind() == SqlKind.AND) {
+            list.addAll(((SqlCall) node2).getOperandList());
+        } else {
+            list.add(node2);
+        }
+        return SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO, list);
+    }
+
+    static ArrayList<SqlNode> flatten(SqlNode node) {
+        ArrayList<SqlNode> list = new ArrayList<>();
+        flatten(node, list);
+        return list;
+    }
+
+    /** Returns the <code>n</code>th (0-based) input to a join expression. */
+    public static SqlNode getFromNode(SqlSelect query, int ordinal) {
+        SqlNode from = query.getFrom();
+        assert from != null : "from must not be null for " + query;
+        ArrayList<SqlNode> list = flatten(from);
+        return list.get(ordinal);
+    }
+
+    private static void flatten(SqlNode node, ArrayList<SqlNode> list) {
+        switch (node.getKind()) {
+            case JOIN:
+                SqlJoin join = (SqlJoin) node;
+                flatten(join.getLeft(), list);
+                flatten(join.getRight(), list);
+                return;
+            case AS:
+                SqlCall call = (SqlCall) node;
+                flatten(call.operand(0), list);
+                return;
+            default:
+                list.add(node);
+                return;
+        }
+    }
+
+    /** Converts a SqlNode array to a SqlNodeList. */
+    public static SqlNodeList toNodeList(SqlNode[] operands) {
+        SqlNodeList ret = new SqlNodeList(SqlParserPos.ZERO);
+        for (SqlNode node : operands) {
+            ret.add(node);
+        }
+        return ret;
+    }
+
+    /**
+     * Finds the index of an expression in a list, comparing using {@link
+     * SqlNode#equalsDeep(SqlNode, Litmus)}.
+     */
+    public static int indexOfDeep(List<? extends SqlNode> list, SqlNode e, 
Litmus litmus) {
+        for (int i = 0; i < list.size(); i++) {
+            if (e.equalsDeep(list.get(i), litmus)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Returns whether a node represents the NULL value.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>For {@link SqlLiteral} Unknown, returns false.
+     *   <li>For <code>CAST(NULL AS <i>type</i>)</code>, returns true if <code>
+     * allowCast</code> is true, false otherwise.
+     *   <li>For <code>CAST(CAST(NULL AS <i>type</i>) AS <i>type</i>))</code>, 
returns false.
+     * </ul>
+     */
+    public static boolean isNullLiteral(@Nullable SqlNode node, boolean 
allowCast) {
+        if (node instanceof SqlLiteral) {
+            SqlLiteral literal = (SqlLiteral) node;
+            if (literal.getTypeName() == SqlTypeName.NULL) {
+                assert null == literal.getValue();
+                return true;
+            } else {
+                // We don't regard UNKNOWN -- SqlLiteral(null,Boolean) -- as
+                // NULL.
+                return false;
+            }
+        }
+        if (allowCast && node != null) {
+            if (node.getKind() == SqlKind.CAST) {
+                SqlCall call = (SqlCall) node;
+                if (isNullLiteral(call.operand(0), false)) {
+                    // node is "CAST(NULL as type)"
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Returns whether a node represents the NULL value or a series of nested 
<code>
+     * CAST(NULL AS type)</code> calls. For example: <code>
+     * isNull(CAST(CAST(NULL as INTEGER) AS VARCHAR(1)))</code> returns {@code 
true}.
+     */
+    public static boolean isNull(SqlNode node) {
+        return isNullLiteral(node, false)
+                || node.getKind() == SqlKind.CAST && isNull(((SqlCall) 
node).operand(0));
+    }
+
+    /**
+     * Returns whether a node is a literal.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>For <code>CAST(literal AS <i>type</i>)</code>, returns true if 
<code>
+     * allowCast</code> is true, false otherwise.
+     *   <li>For <code>CAST(CAST(literal AS <i>type</i>) AS 
<i>type</i>))</code>, returns false.
+     * </ul>
+     *
+     * @param node The node, never null.
+     * @param allowCast whether to regard CAST(literal) as a literal
+     * @return Whether the node is a literal
+     */
+    public static boolean isLiteral(SqlNode node, boolean allowCast) {
+        assert node != null;
+        if (node instanceof SqlLiteral) {
+            return true;
+        }
+        if (!allowCast) {
+            return false;
+        }
+        switch (node.getKind()) {
+            case CAST:
+                // "CAST(e AS type)" is literal if "e" is literal
+                return isLiteral(((SqlCall) node).operand(0), true);
+            case MAP_VALUE_CONSTRUCTOR:
+            case ARRAY_VALUE_CONSTRUCTOR:
+                return ((SqlCall) node).getOperandList().stream().allMatch(o 
-> isLiteral(o, true));
+            case DEFAULT:
+                return true; // DEFAULT is always NULL
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Returns whether a node is a literal.
+     *
+     * <p>Many constructs which require literals also accept <code>CAST(NULL AS
+     * <i>type</i>)</code>. This method does not accept casts, so you should 
call {@link
+     * #isNullLiteral} first.
+     *
+     * @param node The node, never null.
+     * @return Whether the node is a literal
+     */
+    public static boolean isLiteral(SqlNode node) {
+        return isLiteral(node, false);
+    }
+
+    /**
+     * Returns whether a node is a literal chain which is used to represent a 
continued string
+     * literal.
+     *
+     * @param node The node, never null.
+     * @return Whether the node is a literal chain
+     */
+    public static boolean isLiteralChain(SqlNode node) {
+        assert node != null;
+        if (node instanceof SqlCall) {
+            SqlCall call = (SqlCall) node;
+            return call.getKind() == SqlKind.LITERAL_CHAIN;
+        } else {
+            return false;
+        }
+    }
+
+    @Deprecated // to be removed before 2.0
+    public static void unparseFunctionSyntax(SqlOperator operator, SqlWriter 
writer, SqlCall call) {
+        unparseFunctionSyntax(operator, writer, call, false);
+    }
+
+    /**
+     * Unparses a call to an operator that has function syntax.
+     *
+     * @param operator The operator
+     * @param writer Writer
+     * @param call List of 0 or more operands
+     * @param ordered Whether argument list may end with ORDER BY
+     */
+    public static void unparseFunctionSyntax(
+            SqlOperator operator, SqlWriter writer, SqlCall call, boolean 
ordered) {
+        if (operator instanceof SqlFunction) {
+            SqlFunction function = (SqlFunction) operator;
+
+            if (function.getFunctionType().isSpecific()) {
+                writer.keyword("SPECIFIC");
+            }
+            SqlIdentifier id = function.getSqlIdentifier();
+            if (id == null) {
+                writer.keyword(operator.getName());
+            } else {
+                unparseSqlIdentifierSyntax(writer, id, true);
+            }
+        } else {
+            writer.print(operator.getName());
+        }
+        if (call.operandCount() == 0) {
+            switch (call.getOperator().getSyntax()) {
+                case FUNCTION_ID:
+                    // For example, the "LOCALTIME" function appears as 
"LOCALTIME"
+                    // when it has 0 args, not "LOCALTIME()".
+                    return;
+                case FUNCTION_STAR: // E.g. "COUNT(*)"
+                case FUNCTION: // E.g. "RANK()"
+                case ORDERED_FUNCTION: // E.g. "STRING_AGG(x)"
+                    // fall through - dealt with below
+                    break;
+                default:
+                    break;
+            }
+        }
+        final SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+        final SqlLiteral quantifier = call.getFunctionQuantifier();
+        if (quantifier != null) {
+            quantifier.unparse(writer, 0, 0);
+        }
+        if (call.operandCount() == 0) {
+            switch (call.getOperator().getSyntax()) {
+                case FUNCTION_STAR:
+                    writer.sep("*");
+                    break;
+                default:
+                    break;
+            }
+        }
+        for (SqlNode operand : call.getOperandList()) {
+            if (ordered && operand instanceof SqlNodeList) {
+                writer.sep("ORDER BY");
+            } else if (ordered && operand.getKind() == SqlKind.SEPARATOR) {
+                writer.sep("SEPARATOR");
+                ((SqlCall) operand).operand(0).unparse(writer, 0, 0);
+                continue;
+            } else {
+                writer.sep(",");
+            }
+            operand.unparse(writer, 0, 0);
+        }
+
+        writer.endList(frame);
+    }
+
+    /**
+     * Unparse a SqlIdentifier syntax.
+     *
+     * @param writer Writer
+     * @param identifier SqlIdentifier
+     * @param asFunctionID Whether this identifier comes from a SqlFunction
+     */
+    public static void unparseSqlIdentifierSyntax(
+            SqlWriter writer, SqlIdentifier identifier, boolean asFunctionID) {
+        final boolean isUnquotedSimple =
+                identifier.isSimple() && 
!identifier.getParserPosition().isQuoted();
+        final SqlOperator operator =
+                isUnquotedSimple
+                        ? SqlValidatorUtil.lookupSqlFunctionByID(
+                                SqlStdOperatorTable.instance(), identifier, 
null)
+                        : null;
+        boolean unparsedAsFunc = false;
+        final SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.IDENTIFIER);
+        if (isUnquotedSimple && operator != null) {
+            // Unparse conditions:
+            // 1. If the identifier is quoted or is component, unparse as 
normal.
+            // 2. If the identifier comes from a sql function, lookup in the
+            // standard sql operator table to see if the function is a builtin,
+            // unparse without quoting for builtins.
+
+            // 3. If the identifier does not come from a function(resolved as 
a SqlIdentifier),
+            // look up in the standard sql operator table to see if it is a 
function
+            // with empty argument list, e.g. LOCALTIME, we should not quote
+            // such identifier cause quoted `LOCALTIME` always represents a 
sql identifier.
+            if (asFunctionID || operator.getSyntax() == SqlSyntax.FUNCTION_ID) 
{
+                writer.keyword(identifier.getSimple());
+                unparsedAsFunc = true;
+            }
+        }
+
+        if (!unparsedAsFunc) {
+            for (int i = 0; i < identifier.names.size(); i++) {
+                writer.sep(".");
+                final String name = identifier.names.get(i);
+                final SqlParserPos pos = 
identifier.getComponentParserPosition(i);
+                if (name.equals("")) {
+                    writer.print("*");
+                    writer.setNeedWhitespace(true);
+                } else {
+                    writer.identifier(name, pos.isQuoted());
+                }
+            }
+        }
+        if (null != identifier.getCollation()) {
+            identifier.getCollation().unparse(writer);
+        }
+        writer.endList(frame);
+    }
+
+    public static void unparseBinarySyntax(
+            SqlOperator operator, SqlCall call, SqlWriter writer, int 
leftPrec, int rightPrec) {
+        assert call.operandCount() == 2;
+        final SqlWriter.Frame frame =
+                writer.startList(
+                        (operator instanceof SqlSetOperator)
+                                ? SqlWriter.FrameTypeEnum.SETOP
+                                : SqlWriter.FrameTypeEnum.SIMPLE);
+        call.operand(0).unparse(writer, leftPrec, operator.getLeftPrec());
+        final boolean needsSpace = operator.needsSpace();
+        writer.setNeedWhitespace(needsSpace);
+        writer.sep(operator.getName());
+        writer.setNeedWhitespace(needsSpace);
+        call.operand(1).unparse(writer, operator.getRightPrec(), rightPrec);
+        writer.endList(frame);
+    }
+
+    /**
+     * Concatenates string literals.
+     *
+     * <p>This method takes an array of arguments, since pairwise 
concatenation means too much
+     * string copying.
+     *
+     * @param lits an array of {@link SqlLiteral}, not empty, all of the same 
class
+     * @return a new {@link SqlLiteral}, of that same class, whose value is 
the string concatenation
+     *     of the values of the literals
+     * @throws ClassCastException if the lits are not homogeneous.
+     * @throws ArrayIndexOutOfBoundsException if lits is an empty array.
+     */
+    public static SqlLiteral concatenateLiterals(List<SqlLiteral> lits) {
+        if (lits.size() == 1) {
+            return lits.get(0); // nothing to do
+        }
+        return ((SqlAbstractStringLiteral) lits.get(0)).concat1(lits);
+    }
+
+    /**
+     * Looks up a (possibly overloaded) routine based on name and argument 
types.
+     *
+     * @param opTab operator table to search
+     * @param typeFactory Type factory
+     * @param funcName name of function being invoked
+     * @param argTypes argument types
+     * @param argNames argument names, or null if call by position
+     * @param category whether a function or a procedure. (If a procedure is 
being invoked, the
+     *     overload rules are simpler.)
+     * @param nameMatcher Whether to look up the function case-sensitively
+     * @param coerce Whether to allow type coercion when do filter routines by 
parameter types
+     * @return matching routine, or null if none found
+     * @see Glossary#SQL99 SQL:1999 Part 2 Section 10.4
+     */
+    public static @Nullable SqlOperator lookupRoutine(
+            SqlOperatorTable opTab,
+            RelDataTypeFactory typeFactory,
+            SqlIdentifier funcName,
+            List<RelDataType> argTypes,
+            @Nullable List<String> argNames,
+            @Nullable SqlFunctionCategory category,
+            SqlSyntax syntax,
+            SqlKind sqlKind,
+            SqlNameMatcher nameMatcher,
+            boolean coerce) {
+        Iterator<SqlOperator> list =
+                lookupSubjectRoutines(
+                        opTab,
+                        typeFactory,
+                        funcName,
+                        argTypes,
+                        argNames,
+                        syntax,
+                        sqlKind,
+                        category,
+                        nameMatcher,
+                        coerce);
+        if (list.hasNext()) {
+            // return first on schema path
+            return list.next();
+        }
+        return null;
+    }
+
+    private static Iterator<SqlOperator> filterOperatorRoutinesByKind(
+            Iterator<SqlOperator> routines, final SqlKind sqlKind) {
+        return Iterators.filter(
+                routines,
+                operator -> Objects.requireNonNull(operator, 
"operator").getKind() == sqlKind);
+    }
+
+    /**
+     * Looks up all subject routines matching the given name and argument 
types.
+     *
+     * @param opTab operator table to search
+     * @param typeFactory Type factory
+     * @param funcName name of function being invoked
+     * @param argTypes argument types
+     * @param argNames argument names, or null if call by position
+     * @param sqlSyntax the SqlSyntax of the SqlOperator being looked up
+     * @param sqlKind the SqlKind of the SqlOperator being looked up
+     * @param category Category of routine to look up
+     * @param nameMatcher Whether to look up the function case-sensitively
+     * @param coerce Whether to allow type coercion when do filter routine by 
parameter types
+     * @return list of matching routines
+     * @see Glossary#SQL99 SQL:1999 Part 2 Section 10.4
+     */
+    public static Iterator<SqlOperator> lookupSubjectRoutines(
+            SqlOperatorTable opTab,
+            RelDataTypeFactory typeFactory,
+            SqlIdentifier funcName,
+            List<RelDataType> argTypes,
+            @Nullable List<String> argNames,
+            SqlSyntax sqlSyntax,
+            SqlKind sqlKind,
+            @Nullable SqlFunctionCategory category,
+            SqlNameMatcher nameMatcher,
+            boolean coerce) {
+        // start with all routines matching by name
+        Iterator<SqlOperator> routines =
+                lookupSubjectRoutinesByName(opTab, funcName, sqlSyntax, 
category, nameMatcher);
+
+        // first pass:  eliminate routines which don't accept the given
+        // number of arguments
+        routines = filterRoutinesByParameterCount(routines, argTypes);
+
+        // NOTE: according to SQL99, procedures are NOT overloaded on type,
+        // only on number of arguments.
+        if (category == SqlFunctionCategory.USER_DEFINED_PROCEDURE) {
+            return routines;
+        }
+
+        // second pass:  eliminate routines which don't accept the given
+        // argument types and parameter names if specified
+        routines =
+                filterRoutinesByParameterTypeAndName(
+                        typeFactory, sqlSyntax, routines, argTypes, argNames, 
coerce);
+
+        // see if we can stop now; this is necessary for the case
+        // of builtin functions where we don't have param type info,
+        // or UDF whose operands can make type coercion.
+        final List<SqlOperator> list = Lists.newArrayList(routines);
+        routines = list.iterator();
+        if (list.size() < 2 || coerce) {
+            return routines;
+        }
+
+        // third pass:  for each parameter from left to right, eliminate
+        // all routines except those with the best precedence match for
+        // the given arguments
+        routines =
+                filterRoutinesByTypePrecedence(
+                        sqlSyntax, typeFactory, routines, argTypes, argNames);
+
+        // fourth pass: eliminate routines which do not have the same
+        // SqlKind as requested
+        return filterOperatorRoutinesByKind(routines, sqlKind);
+    }
+
+    /**
+     * Determines whether there is a routine matching the given name and 
number of arguments.
+     *
+     * @param opTab operator table to search
+     * @param funcName name of function being invoked
+     * @param argTypes argument types
+     * @param category category of routine to look up
+     * @param nameMatcher Whether to look up the function case-sensitively
+     * @return true if match found
+     */
+    public static boolean matchRoutinesByParameterCount(
+            SqlOperatorTable opTab,
+            SqlIdentifier funcName,
+            List<RelDataType> argTypes,
+            SqlFunctionCategory category,
+            SqlNameMatcher nameMatcher) {
+        // start with all routines matching by name
+        Iterator<SqlOperator> routines =
+                lookupSubjectRoutinesByName(
+                        opTab, funcName, SqlSyntax.FUNCTION, category, 
nameMatcher);
+
+        // first pass:  eliminate routines which don't accept the given
+        // number of arguments
+        routines = filterRoutinesByParameterCount(routines, argTypes);
+
+        return routines.hasNext();
+    }
+
+    private static Iterator<SqlOperator> lookupSubjectRoutinesByName(
+            SqlOperatorTable opTab,
+            SqlIdentifier funcName,
+            final SqlSyntax syntax,
+            @Nullable SqlFunctionCategory category,
+            SqlNameMatcher nameMatcher) {
+        final List<SqlOperator> sqlOperators = new ArrayList<>();
+        opTab.lookupOperatorOverloads(funcName, category, syntax, 
sqlOperators, nameMatcher);
+        switch (syntax) {
+            case FUNCTION:
+                return Iterators.filter(
+                        sqlOperators.iterator(), 
Predicates.instanceOf(SqlFunction.class));
+            default:
+                return Iterators.filter(
+                        sqlOperators.iterator(),
+                        operator ->
+                                Objects.requireNonNull(operator, 
"operator").getSyntax() == syntax);
+        }
+    }
+
+    private static Iterator<SqlOperator> filterRoutinesByParameterCount(
+            Iterator<SqlOperator> routines, final List<RelDataType> argTypes) {
+        return Iterators.filter(
+                routines,
+                operator ->
+                        Objects.requireNonNull(operator, "operator")
+                                .getOperandCountRange()
+                                .isValidCount(argTypes.size()));
+    }
+
+    /**
+     * Filters an iterator of routines, keeping only those that have the 
required argument types and
+     * names.
+     *
+     * @see Glossary#SQL99 SQL:1999 Part 2 Section 10.4 Syntax Rule 6.b.iii.2.B
+     */
+    private static Iterator<SqlOperator> filterRoutinesByParameterTypeAndName(
+            RelDataTypeFactory typeFactory,
+            SqlSyntax syntax,
+            final Iterator<SqlOperator> routines,
+            final List<RelDataType> argTypes,
+            final @Nullable List<String> argNames,
+            final boolean coerce) {
+        if (syntax != SqlSyntax.FUNCTION) {
+            return routines;
+        }
+
+        //noinspection unchecked
+        return (Iterator)
+                Iterators.filter(
+                        Iterators.filter(routines, SqlFunction.class),
+                        function -> {
+                            SqlOperandTypeChecker operandTypeChecker =
+                                    Objects.requireNonNull(function, 
"function")
+                                            .getOperandTypeChecker();
+                            if (operandTypeChecker == null
+                                    || 
!operandTypeChecker.isFixedParameters()) {
+                                // no parameter information for builtins; keep 
for now,
+                                // the type coerce will not work here.
+                                return true;
+                            }
+                            final SqlOperandMetadata operandMetadata =
+                                    (SqlOperandMetadata) operandTypeChecker;
+                            @SuppressWarnings("assignment.type.incompatible")
+                            final List<@Nullable RelDataType> paramTypes =
+                                    operandMetadata.paramTypes(typeFactory);
+                            final List<@Nullable RelDataType> permutedArgTypes;
+                            if (argNames != null) {
+                                final List<String> paramNames = 
operandMetadata.paramNames();
+                                permutedArgTypes = permuteArgTypes(paramNames, 
argNames, argTypes);
+                                if (permutedArgTypes == null) {
+                                    return false;
+                                }
+                            } else {
+                                permutedArgTypes = 
Lists.newArrayList(argTypes);
+                                while (permutedArgTypes.size() < 
argTypes.size()) {
+                                    paramTypes.add(null);
+                                }
+                            }
+                            for (Pair<@Nullable RelDataType, @Nullable 
RelDataType> p :
+                                    Pair.zip(paramTypes, permutedArgTypes)) {
+                                final RelDataType argType = p.right;
+                                final RelDataType paramType = p.left;
+                                if (argType != null
+                                        && paramType != null
+                                        && !SqlTypeUtil.canCastFrom(paramType, 
argType, coerce)) {
+                                    return false;
+                                }
+                            }
+                            return true;
+                        });
+    }
+
+    /** Permutes argument types to correspond to the order of parameter names. 
*/
+    private static @Nullable List<@Nullable RelDataType> permuteArgTypes(
+            List<String> paramNames, List<String> argNames, List<RelDataType> 
argTypes) {
+        // Arguments passed by name. Make sure that the function has
+        // parameters of all of these names.
+        Map<Integer, Integer> map = new HashMap<>();
+        for (Ord<String> argName : Ord.zip(argNames)) {
+            int i = paramNames.indexOf(argName.e);
+            if (i < 0) {
+                return null;
+            }
+            map.put(i, argName.i);
+        }
+        return Functions.<@Nullable RelDataType>generate(
+                paramNames.size(),
+                index -> {
+                    Integer argIndex = map.get(index);
+                    return argIndex != null ? argTypes.get(argIndex) : null;
+                });
+    }
+
+    /**
+     * Filters an iterator of routines, keeping only those with the best match 
for the actual
+     * argument types.
+     *
+     * @see Glossary#SQL99 SQL:1999 Part 2 Section 9.4
+     */
+    private static Iterator<SqlOperator> filterRoutinesByTypePrecedence(
+            SqlSyntax sqlSyntax,
+            RelDataTypeFactory typeFactory,
+            Iterator<SqlOperator> routines,
+            List<RelDataType> argTypes,
+            @Nullable List<String> argNames) {
+        if (sqlSyntax != SqlSyntax.FUNCTION) {
+            return routines;
+        }
+
+        List<SqlFunction> sqlFunctions =
+                Lists.newArrayList(Iterators.filter(routines, 
SqlFunction.class));
+
+        for (final Ord<RelDataType> argType : Ord.zip(argTypes)) {
+            final RelDataTypePrecedenceList precList = 
argType.e.getPrecedenceList();
+            final RelDataType bestMatch =
+                    bestMatch(typeFactory, sqlFunctions, argType.i, argNames, 
precList);
+            if (bestMatch != null) {
+                sqlFunctions =
+                        sqlFunctions.stream()
+                                .filter(
+                                        function -> {
+                                            SqlOperandTypeChecker 
operandTypeChecker =
+                                                    
function.getOperandTypeChecker();
+                                            if (operandTypeChecker == null
+                                                    || 
!operandTypeChecker.isFixedParameters()) {
+                                                return false;
+                                            }
+                                            final SqlOperandMetadata 
operandMetadata =
+                                                    (SqlOperandMetadata) 
operandTypeChecker;
+                                            final List<String> paramNames =
+                                                    
operandMetadata.paramNames();
+                                            final List<RelDataType> paramTypes 
=
+                                                    
operandMetadata.paramTypes(typeFactory);
+                                            int index =
+                                                    argNames != null
+                                                            ? 
paramNames.indexOf(
+                                                                    
argNames.get(argType.i))
+                                                            : argType.i;
+                                            final RelDataType paramType = 
paramTypes.get(index);
+                                            return 
precList.compareTypePrecedence(
+                                                            paramType, 
bestMatch)
+                                                    >= 0;
+                                        })
+                                .collect(Collectors.toList());
+            }
+        }
+        //noinspection unchecked
+        return (Iterator) sqlFunctions.iterator();
+    }
+
+    private static @Nullable RelDataType bestMatch(
+            RelDataTypeFactory typeFactory,
+            List<SqlFunction> sqlFunctions,
+            int i,
+            @Nullable List<String> argNames,
+            RelDataTypePrecedenceList precList) {
+        RelDataType bestMatch = null;
+        for (SqlFunction function : sqlFunctions) {
+            SqlOperandTypeChecker operandTypeChecker = 
function.getOperandTypeChecker();
+            if (operandTypeChecker == null || 
!operandTypeChecker.isFixedParameters()) {
+                continue;
+            }
+            final SqlOperandMetadata operandMetadata = (SqlOperandMetadata) 
operandTypeChecker;
+            final List<RelDataType> paramTypes = 
operandMetadata.paramTypes(typeFactory);
+            final List<String> paramNames = operandMetadata.paramNames();
+            final RelDataType paramType =
+                    argNames != null
+                            ? 
paramTypes.get(paramNames.indexOf(argNames.get(i)))
+                            : paramTypes.get(i);
+            if (bestMatch == null) {
+                bestMatch = paramType;
+            } else {
+                int c = precList.compareTypePrecedence(bestMatch, paramType);
+                if (c < 0) {
+                    bestMatch = paramType;
+                }
+            }
+        }
+        return bestMatch;
+    }
+
+    /** Returns the <code>i</code>th select-list item of a query. */
+    public static SqlNode getSelectListItem(SqlNode query, int i) {
+        switch (query.getKind()) {
+            case SELECT:
+                SqlSelect select = (SqlSelect) query;
+                final SqlNode from = stripAs(select.getFrom());
+                if (from != null && from.getKind() == SqlKind.VALUES) {
+                    // They wrote "VALUES (x, y)", but the validator has
+                    // converted this into "SELECT * FROM VALUES (x, y)".
+                    return getSelectListItem(from, i);
+                }
+                final SqlNodeList fields = select.getSelectList();
+
+                assert fields != null : "fields must not be null in " + select;
+                // Range check the index to avoid index out of range.  This
+                // could be expanded to actually check to see if the select
+                // list is a "*"
+                if (i >= fields.size()) {
+                    i = 0;
+                }
+                return fields.get(i);
+
+            case VALUES:
+                SqlCall call = (SqlCall) query;
+                assert call.operandCount() > 0 : "VALUES must have at least 
one operand";
+                final SqlCall row = call.operand(0);
+                assert row.operandCount() > i : "VALUES has too few columns";
+                return row.operand(i);
+
+            // FLINK MODIFICATION BEGIN
+            case EXCEPT:
+            case INTERSECT:
+            case UNION:
+                final List<SqlNode> operandList = ((SqlBasicCall) 
query).getOperandList();
+                return getSelectListItem(operandList.get(0), i);
+            // FLINK MODIFICATION END
+
+            default:
+                // Unexpected type of query.
+                throw Util.needToImplement(query);
+        }
+    }
+
+    public static String deriveAliasFromOrdinal(int ordinal) {
+        return GENERATED_EXPR_ALIAS_PREFIX + ordinal;
+    }
+
+    /**
+     * Whether the alias is generated by calcite.
+     *
+     * @param alias not null
+     * @return true if alias is generated by calcite, otherwise false
+     */
+    public static boolean isGeneratedAlias(String alias) {
+        assert alias != null;
+        return 
alias.toUpperCase(Locale.ROOT).startsWith(GENERATED_EXPR_ALIAS_PREFIX);
+    }
+
+    /**
+     * Constructs an operator signature from a type list.
+     *
+     * @param op operator
+     * @param typeList list of types to use for operands. Types may be 
represented as {@link
+     *     String}, {@link SqlTypeFamily}, or any object with a valid {@link 
Object#toString()}
+     *     method.
+     * @return constructed signature
+     */
+    public static String getOperatorSignature(SqlOperator op, List<?> 
typeList) {
+        return getAliasedSignature(op, op.getName(), typeList);
+    }
+
+    /**
+     * Constructs an operator signature from a type list, substituting an 
alias for the operator
+     * name.
+     *
+     * @param op operator
+     * @param opName name to use for operator
+     * @param typeList list of {@link SqlTypeName} or {@link String} to use 
for operands
+     * @return constructed signature
+     */
+    public static String getAliasedSignature(SqlOperator op, String opName, 
List<?> typeList) {
+        StringBuilder ret = new StringBuilder();
+        String template = op.getSignatureTemplate(typeList.size());
+        if (null == template) {
+            ret.append("'");
+            ret.append(opName);
+            ret.append("(");
+            for (int i = 0; i < typeList.size(); i++) {
+                if (i > 0) {
+                    ret.append(", ");
+                }
+                final String t = 
String.valueOf(typeList.get(i)).toUpperCase(Locale.ROOT);
+                ret.append("<").append(t).append(">");
+            }
+            ret.append(")'");
+        } else {
+            Object[] values = new Object[typeList.size() + 1];
+            values[0] = opName;
+            ret.append("'");
+            for (int i = 0; i < typeList.size(); i++) {
+                final String t = 
String.valueOf(typeList.get(i)).toUpperCase(Locale.ROOT);
+                values[i + 1] = "<" + t + ">";
+            }
+            ret.append(new MessageFormat(template, 
Locale.ROOT).format(values));
+            ret.append("'");
+            assert (typeList.size() + 1) == values.length;
+        }
+
+        return ret.toString();
+    }
+
+    /** Wraps an exception with context. */
+    public static CalciteException newContextException(
+            final SqlParserPos pos, Resources.ExInst<?> e, String inputText) {
+        CalciteContextException ex = newContextException(pos, e);
+        ex.setOriginalStatement(inputText);
+        return ex;
+    }
+
+    /** Wraps an exception with context. */
+    public static CalciteContextException newContextException(
+            final SqlParserPos pos, Resources.ExInst<?> e) {
+        int line = pos.getLineNum();
+        int col = pos.getColumnNum();
+        int endLine = pos.getEndLineNum();
+        int endCol = pos.getEndColumnNum();
+        return newContextException(line, col, endLine, endCol, e);
+    }
+
+    /** Wraps an exception with context. */
+    public static CalciteContextException newContextException(
+            int line, int col, int endLine, int endCol, Resources.ExInst<?> e) 
{
+        CalciteContextException contextExcn =
+                (line == endLine && col == endCol
+                                ? RESOURCE.validatorContextPoint(line, col)
+                                : RESOURCE.validatorContext(line, col, 
endLine, endCol))
+                        .ex(e.ex());
+        contextExcn.setPosition(line, col, endLine, endCol);
+        return contextExcn;
+    }
+
+    /**
+     * Returns whether a {@link SqlNode node} is a {@link SqlCall call} to a 
given {@link
+     * SqlOperator operator}.
+     */
+    public static boolean isCallTo(SqlNode node, SqlOperator operator) {
+        return (node instanceof SqlCall) && (((SqlCall) node).getOperator() == 
operator);
+    }
+
+    /**
+     * Creates the type of an {@link org.apache.calcite.util.NlsString}.
+     *
+     * <p>The type inherits the NlsString's {@link Charset} and {@link 
SqlCollation}, if they are
+     * set, otherwise it gets the system defaults.
+     *
+     * @param typeFactory Type factory
+     * @param str String
+     * @return Type, including collation and charset
+     */
+    public static RelDataType createNlsStringType(RelDataTypeFactory 
typeFactory, NlsString str) {
+        Charset charset = str.getCharset();
+        if (null == charset) {
+            charset = typeFactory.getDefaultCharset();
+        }
+        SqlCollation collation = str.getCollation();
+        if (null == collation) {
+            collation = SqlCollation.COERCIBLE;
+        }
+        RelDataType type = typeFactory.createSqlType(SqlTypeName.CHAR, 
str.getValue().length());
+        type = typeFactory.createTypeWithCharsetAndCollation(type, charset, 
collation);
+        return type;
+    }
+
+    /**
+     * Translates a character set name from a SQL-level name into a Java-level 
name.
+     *
+     * @param name SQL-level name
+     * @return Java-level name, or null if SQL-level name is unknown
+     */
+    public static @Nullable String translateCharacterSetName(String name) {
+        switch (name) {
+            case "BIG5":
+                return "Big5";
+            case "LATIN1":
+                return "ISO-8859-1";
+            case "UTF8":
+                return "UTF-8";
+            case "UTF16":
+            case "UTF-16":
+                return ConversionUtil.NATIVE_UTF16_CHARSET_NAME;
+            case "GB2312":
+            case "GBK":
+            case "UTF-16BE":
+            case "UTF-16LE":
+            case "ISO-8859-1":
+            case "UTF-8":
+                return name;
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Returns the Java-level {@link Charset} based on given SQL-level name.
+     *
+     * @param charsetName Sql charset name, must not be null.
+     * @return charset, or default charset if charsetName is null.
+     * @throws UnsupportedCharsetException If no support for the named charset 
is available in this
+     *     instance of the Java virtual machine
+     */
+    public static Charset getCharset(String charsetName) {
+        assert charsetName != null;
+        charsetName = charsetName.toUpperCase(Locale.ROOT);
+        String javaCharsetName = translateCharacterSetName(charsetName);
+        if (javaCharsetName == null) {
+            throw new UnsupportedCharsetException(charsetName);
+        }
+        return Charset.forName(javaCharsetName);
+    }
+
+    /**
+     * Validate if value can be decoded by given charset.
+     *
+     * @param value nls string in byte array
+     * @param charset charset
+     * @throws RuntimeException If the given value cannot be represented in 
the given charset
+     */
+    @SuppressWarnings("BetaApi")
+    public static void validateCharset(ByteString value, Charset charset) {
+        if (charset == StandardCharsets.UTF_8) {
+            final byte[] bytes = value.getBytes();
+            if (!Utf8.isWellFormed(bytes)) {
+                // CHECKSTYLE: IGNORE 1
+                final String string = new String(bytes, charset);
+                throw RESOURCE.charsetEncoding(string, charset.name()).ex();
+            }
+        }
+    }
+
+    /**
+     * If a node is "AS", returns the underlying expression; otherwise returns 
the node. Returns
+     * null if and only if the node is null.
+     */
+    public static @PolyNull SqlNode stripAs(@PolyNull SqlNode node) {
+        if (node != null && node.getKind() == SqlKind.AS) {
+            return ((SqlCall) node).operand(0);
+        }
+        return node;
+    }
+
+    /**
+     * Modifies a list of nodes, removing AS from each if present.
+     *
+     * @see #stripAs
+     */
+    public static SqlNodeList stripListAs(SqlNodeList nodeList) {
+        for (int i = 0; i < nodeList.size(); i++) {
+            SqlNode n = nodeList.get(i);
+            SqlNode n2 = stripAs(n);
+            if (n != n2) {
+                nodeList.set(i, n2);
+            }
+        }
+        return nodeList;
+    }
+
+    /**
+     * Returns a list of ancestors of {@code predicate} within a given {@code 
SqlNode} tree.
+     *
+     * <p>The first element of the list is {@code root}, and the last is the 
node that matched
+     * {@code predicate}. Throws if no node matches.
+     */
+    public static ImmutableList<SqlNode> getAncestry(
+            SqlNode root, Predicate<SqlNode> predicate, Predicate<SqlNode> 
postPredicate) {
+        try {
+            new Genealogist(predicate, postPredicate).visitChild(root);
+            throw new AssertionError("not found: " + predicate + " in " + 
root);
+        } catch (Util.FoundOne e) {
+            //noinspection unchecked
+            return (ImmutableList<SqlNode>)
+                    Objects.requireNonNull(e.getNode(), "Genealogist result");
+        }
+    }
+
+    /**
+     * Returns an immutable list of {@link RelHint} from sql hints, with a 
given inherit path from
+     * the root node.
+     *
+     * <p>The inherit path would be empty list.
+     *
+     * @param hintStrategies The hint strategies to validate the sql hints
+     * @param sqlHints The sql hints nodes
+     * @return the {@code RelHint} list
+     */
+    public static List<RelHint> getRelHint(
+            HintStrategyTable hintStrategies, @Nullable SqlNodeList sqlHints) {
+        if (sqlHints == null || sqlHints.size() == 0) {
+            return ImmutableList.of();
+        }
+        final ImmutableList.Builder<RelHint> relHints = 
ImmutableList.builder();
+        for (SqlNode node : sqlHints) {
+            assert node instanceof SqlHint;
+            final SqlHint sqlHint = (SqlHint) node;
+            final String hintName = sqlHint.getName();
+
+            final RelHint.Builder builder = RelHint.builder(hintName);
+            switch (sqlHint.getOptionFormat()) {
+                case EMPTY:
+                    // do nothing.
+                    break;
+                case LITERAL_LIST:
+                case ID_LIST:
+                    builder.hintOptions(sqlHint.getOptionList());
+                    break;
+                case KV_LIST:
+                    builder.hintOptions(sqlHint.getOptionKVPairs());
+                    break;
+                default:
+                    throw new AssertionError("Unexpected hint option format");
+            }
+            final RelHint relHint = builder.build();
+            if (hintStrategies.validateHint(relHint)) {
+                // Skips the hint if the validation fails.
+                relHints.add(relHint);
+            }
+        }
+        return relHints.build();
+    }
+
+    /**
+     * Attach the {@code hints} to {@code rel} with specified hint strategies.
+     *
+     * @param hintStrategies The strategies to filter the hints
+     * @param hints The original hints to be attached
+     * @return A copy of {@code rel} if there are any hints can be attached 
given the hint
+     *     strategies, or the original node if such hints don't exist
+     */
+    public static RelNode attachRelHint(
+            HintStrategyTable hintStrategies, List<RelHint> hints, Hintable 
rel) {
+        final List<RelHint> relHints = hintStrategies.apply(hints, (RelNode) 
rel);
+        if (relHints.size() > 0) {
+            return rel.attachHints(relHints);
+        }
+        return (RelNode) rel;
+    }
+
+    /**
+     * Creates a call to an operator.
+     *
+     * <p>Deals with the fact the AND and OR are binary.
+     */
+    public static SqlNode createCall(SqlOperator op, SqlParserPos pos, 
List<SqlNode> operands) {
+        switch (op.kind) {
+            case OR:
+            case AND:
+                // In RexNode trees, OR and AND have any number of children;
+                // SqlCall requires exactly 2. So, convert to a balanced binary
+                // tree for OR/AND, left-deep binary tree for others.
+                switch (operands.size()) {
+                    case 0:
+                        return SqlLiteral.createBoolean(op.kind == 
SqlKind.AND, pos);
+                    case 1:
+                        return operands.get(0);
+                    default:
+                        return createBalancedCall(op, pos, operands, 0, 
operands.size());
+                    case 2:
+                    case 3:
+                    case 4:
+                    case 5:
+                        // fall through
+                }
+                // fall through
+                break;
+            default:
+                break;
+        }
+        if (op instanceof SqlBinaryOperator && operands.size() > 2) {
+            return createLeftCall(op, pos, operands);
+        }
+        return op.createCall(pos, operands);
+    }
+
+    private static SqlNode createLeftCall(
+            SqlOperator op, SqlParserPos pos, List<SqlNode> nodeList) {
+        SqlNode node = op.createCall(pos, nodeList.subList(0, 2));
+        for (int i = 2; i < nodeList.size(); i++) {
+            node = op.createCall(pos, node, nodeList.get(i));
+        }
+        return node;
+    }
+
+    /** Creates a balanced binary call from sql node list, start inclusive, 
end exclusive. */
+    private static SqlNode createBalancedCall(
+            SqlOperator op, SqlParserPos pos, List<SqlNode> operands, int 
start, int end) {
+        assert start < end && end <= operands.size();
+        if (start + 1 == end) {
+            return operands.get(start);
+        }
+        int mid = (end - start) / 2 + start;
+        SqlNode leftNode = createBalancedCall(op, pos, operands, start, mid);
+        SqlNode rightNode = createBalancedCall(op, pos, operands, mid, end);
+        return op.createCall(pos, leftNode, rightNode);
+    }
+
+    // ~ Inner Classes 
----------------------------------------------------------
+
+    /**
+     * Handles particular {@link DatabaseMetaData} methods; invocations of 
other methods will fall
+     * through to the base class, {@link 
org.apache.calcite.util.BarfingInvocationHandler}, which
+     * will throw an error.
+     */
+    public static class DatabaseMetaDataInvocationHandler extends 
BarfingInvocationHandler {
+        private final String databaseProductName;
+        private final String identifierQuoteString;
+
+        public DatabaseMetaDataInvocationHandler(
+                String databaseProductName, String identifierQuoteString) {
+            this.databaseProductName = databaseProductName;
+            this.identifierQuoteString = identifierQuoteString;
+        }
+
+        public String getDatabaseProductName() throws SQLException {
+            return databaseProductName;
+        }
+
+        public String getIdentifierQuoteString() throws SQLException {
+            return identifierQuoteString;
+        }
+    }
+
+    /**
+     * Walks over a {@link org.apache.calcite.sql.SqlNode} tree and returns 
the ancestry stack when
+     * it finds a given node.
+     */
+    private static class Genealogist extends SqlBasicVisitor<Void> {
+        private final List<SqlNode> ancestors = new ArrayList<>();
+        private final Predicate<SqlNode> predicate;
+        private final Predicate<SqlNode> postPredicate;
+
+        Genealogist(Predicate<SqlNode> predicate, Predicate<SqlNode> 
postPredicate) {
+            this.predicate = predicate;
+            this.postPredicate = postPredicate;
+        }
+
+        private Void check(SqlNode node) {
+            preCheck(node);
+            postCheck(node);
+            return null;
+        }
+
+        private Void preCheck(SqlNode node) {
+            if (predicate.test(node)) {
+                throw new Util.FoundOne(ImmutableList.copyOf(ancestors));
+            }
+            return null;
+        }
+
+        private Void postCheck(SqlNode node) {
+            if (postPredicate.test(node)) {
+                throw new Util.FoundOne(ImmutableList.copyOf(ancestors));
+            }
+            return null;
+        }
+
+        private void visitChild(@Nullable SqlNode node) {
+            if (node == null) {
+                return;
+            }
+            ancestors.add(node);
+            node.accept(this);
+            ancestors.remove(ancestors.size() - 1);
+        }
+
+        @Override
+        public Void visit(SqlIdentifier id) {
+            return check(id);
+        }
+
+        @Override
+        public Void visit(SqlCall call) {
+            preCheck(call);
+            for (SqlNode node : call.getOperandList()) {
+                visitChild(node);
+            }
+            return postCheck(call);
+        }
+
+        @Override
+        public Void visit(SqlIntervalQualifier intervalQualifier) {
+            return check(intervalQualifier);
+        }
+
+        @Override
+        public Void visit(SqlLiteral literal) {
+            return check(literal);
+        }
+
+        @Override
+        public Void visit(SqlNodeList nodeList) {
+            preCheck(nodeList);
+            for (SqlNode node : nodeList) {
+                visitChild(node);
+            }
+            return postCheck(nodeList);
+        }
+
+        @Override
+        public Void visit(SqlDynamicParam param) {
+            return check(param);
+        }
+
+        @Override
+        public Void visit(SqlDataTypeSpec type) {
+            return check(type);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
index eb1ca7e8a93..630cfeb1f2b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.stream.sql
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.utils.TableTestBase
 
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 import org.junit.jupiter.api.{BeforeEach, Test}
 
 // TODO add more union case after aggregation and join supported
@@ -130,4 +131,20 @@ class UnionTest extends TableTestBase {
     util.verifyRelPlanWithType(sqlQuery)
   }
 
+  @Test
+  def testSeveralUnionWithOneWrongTypeColumn(): Unit = {
+    val sqlQuery =
+      """
+        | SELECT id, ts, name, timestamp_col, timestamp_ltz_col FROM t2
+        | UNION ALL
+        | SELECT  id, ts, name, timestamp_col, timestamp_ltz_col FROM t3
+        | UNION ALL
+        | SELECT  id, ts, timestamp_col as wrong_column_type, timestamp_col, 
timestamp_ltz_col FROM t2
+      """.stripMargin
+
+    val error = assertThatThrownBy(() => util.verifyRelPlanWithType(sqlQuery))
+
+    error.isInstanceOf(classOf[ValidationException])
+    error.hasMessageContaining("Type mismatch in column 3 of UNION ALL")
+  }
 }

Reply via email to