This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c029d0c27d [FLINK-39577][table] Reuse Calcite's `COALESCE` to apply 
simplifications from `RexSimplify`
8c029d0c27d is described below

commit 8c029d0c27d51bf257c83068ba46af093fdd3bd6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue May 26 21:10:17 2026 +0200

    [FLINK-39577][table] Reuse Calcite's `COALESCE` to apply simplifications 
from `RexSimplify`
---
 .../service/MaterializedTableStatementITCase.java  |   2 +-
 .../functions/BuiltInFunctionDefinitions.java      |   3 +-
 .../calcite/sql/fun/SqlCoalesceFunction.java       |  96 +++++++
 .../expressions/converter/DirectConvertRule.java   |   3 +
 .../functions/sql/FlinkSqlOperatorTable.java       |   1 +
 .../RemoveUnreachableCoalesceArgumentsRule.java    | 212 ---------------
 .../table/planner/codegen/ExprCodeGenerator.scala  |   6 +
 .../planner/codegen/calls/ScalarOperatorGens.scala |  45 ++-
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  20 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  20 +-
 .../planner/functions/CoalesceFunctionITCase.java  | 301 ++++++++++++++++++++-
 .../plan/nodes/exec/common/CalcTestPrograms.java   |  41 +++
 .../plan/nodes/exec/stream/CalcRestoreTest.java    |   3 +-
 ...RemoveUnreachableCoalesceArgumentsRuleTest.java |   5 +-
 .../plan/batch/sql/agg/GroupingSetsTest.xml        |   4 +-
 .../RemoveUnreachableCoalesceArgumentsRuleTest.xml |  10 +-
 ...mplifyCoalesceWithEquiJoinConditionRuleTest.xml |  14 +-
 .../table/planner/plan/stream/sql/CalcTest.xml     |  12 +-
 .../plan/stream/sql/agg/GroupingSetsTest.xml       |   4 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |   4 +-
 .../calc-coalesce/plan/calc-coalesce.json          | 135 +++++++++
 .../calc-coalesce/savepoint/_metadata              | Bin 0 -> 8194 bytes
 .../plan/calc-current-timestamp.json               |  38 ++-
 .../calc-current-timestamp/savepoint/_metadata     | Bin 5868 -> 5860 bytes
 .../runtime/functions/scalar/CoalesceFunction.java |  43 ---
 25 files changed, 698 insertions(+), 324 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 4e128e2cd8d..c3a64b5c8fe 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -1322,7 +1322,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
         assertThat(newTable.getExpandedQuery())
                 .hasToString(
                         String.format(
-                                "SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS 
BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+                                "SELECT COALESCE(`tmp`.`user_id`, 0) AS 
`user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
                                         + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
                                         + "FROM `%s`.`%s`.`datagenSource` AS 
`datagenSource`) AS `tmp`\n"
                                         + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e3d30a5c8da..4d0c9b28dac 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -241,8 +241,7 @@ public final class BuiltInFunctionDefinitions {
                     .kind(SCALAR)
                     .inputTypeStrategy(varyingSequence(COMMON_ARG_NULLABLE, 
COMMON_ARG_NULLABLE))
                     .outputTypeStrategy(nullableIfAllArgs(COMMON))
-                    .runtimeClass(
-                            
"org.apache.flink.table.runtime.functions.scalar.CoalesceFunction")
+                    .runtimeDeferred()
                     .build();
 
     public static final BuiltInFunctionDefinition ARRAY_APPEND =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
new file mode 100644
index 00000000000..59e99e5cba2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCoalesceFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeTransforms;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** The class copied from Calcite in order to turn off COALESCE rewrite with 
CASE ... WHEN ... */
+public class SqlCoalesceFunction extends SqlFunction {
+    // ~ Constructors 
-----------------------------------------------------------
+
+    public SqlCoalesceFunction() {
+        // NOTE jvs 26-July-2006:  We fill in the type strategies here,
+        // but normally they are not used because the validator invokes
+        // rewriteCall to convert COALESCE into CASE early.  However,
+        // validator rewrite can optionally be disabled, in which case these
+        // strategies are used.
+        super(
+                "COALESCE",
+                SqlKind.COALESCE,
+                
ReturnTypes.LEAST_RESTRICTIVE.andThen(SqlTypeTransforms.LEAST_NULLABLE),
+                null,
+                OperandTypes.SAME_VARIADIC,
+                SqlFunctionCategory.SYSTEM);
+    }
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    // ----- FLINK MODIFICATION BEGIN -----
+    // override SqlOperator
+    @Override
+    public SqlNode rewriteCall(SqlValidator validator, SqlCall call) {
+        validateQuantifier(validator, call); // check DISTINCT/ALL
+
+        List<SqlNode> operands = call.getOperandList();
+
+        if (operands.size() == 1) {
+            return operands.get(0);
+        }
+
+        SqlParserPos pos = call.getParserPosition();
+        List<SqlNode> nodes = new ArrayList<>();
+        for (SqlNode operand : operands) {
+            if (!SqlUtil.isNullLiteral(operand, false)) {
+                nodes.add(operand);
+            }
+        }
+
+        if (nodes.isEmpty()) {
+            return SqlLiteral.createNull(pos);
+        }
+        if (nodes.size() == 1) {
+            return nodes.get(0);
+        }
+
+        return new SqlBasicCall(this, nodes, pos);
+    }
+
+    // ----- FLINK MODIFICATION END -----
+
+    @Override
+    public SqlReturnTypeInference getReturnTypeInference() {
+        return requireNonNull(super.getReturnTypeInference(), 
"returnTypeInference");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index a2d4f8b0df0..17dffb8613f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -298,6 +298,9 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.ARRAY_ELEMENT, 
FlinkSqlOperatorTable.ELEMENT);
 
+        definitionSqlOperatorHashMap.put(
+                BuiltInFunctionDefinitions.COALESCE, 
FlinkSqlOperatorTable.COALESCE);
+
         // crypto hash
         definitionSqlOperatorHashMap.put(BuiltInFunctionDefinitions.MD5, 
FlinkSqlOperatorTable.MD5);
         definitionSqlOperatorHashMap.put(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index af57daf6b26..f44f1806df3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1203,6 +1203,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
     public static final SqlFunction EXP = SqlStdOperatorTable.EXP;
     public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF;
+    public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
     public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
     public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
     public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
deleted file mode 100644
index 47cde317256..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.rules.logical;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.sql.SqlOperator;
-import org.immutables.value.Value;
-
-import java.util.List;
-import java.util.function.Predicate;
-
-/**
- * Removes unreachable {@link BuiltInFunctionDefinitions#COALESCE} arguments.
- *
- * <p>An unreachable COALESCE argument is defined as any argument after the 
first argument in the
- * argument list with a non-null type.
- */
-@Internal
[email protected]
-public class RemoveUnreachableCoalesceArgumentsRule
-        extends RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> {
-
-    public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> 
PROJECT_INSTANCE =
-            Config.DEFAULT.withProject().toRule();
-    public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> 
FILTER_INSTANCE =
-            Config.DEFAULT.withFilter().toRule();
-    public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> 
JOIN_INSTANCE =
-            Config.DEFAULT.withJoin().toRule();
-    public static final RelRule<RemoveUnreachableCoalesceArgumentsRule.Config> 
CALC_INSTANCE =
-            Config.DEFAULT.withCalc().toRule();
-
-    public RemoveUnreachableCoalesceArgumentsRule(Config config) {
-        super(config);
-    }
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-        final RelNode relNode = call.rel(0);
-        final RexBuilder rexBuilder = relNode.getCluster().getRexBuilder();
-        call.transformTo(
-                relNode.accept(new 
UnreachableCoalesceArgumentsRemoveRexShuttle(rexBuilder)));
-    }
-
-    private static class UnreachableCoalesceArgumentsRemoveRexShuttle extends 
RexShuttle {
-        private final RexBuilder rexBuilder;
-
-        private UnreachableCoalesceArgumentsRemoveRexShuttle(RexBuilder 
rexBuilder) {
-            this.rexBuilder = rexBuilder;
-        }
-
-        @Override
-        public RexNode visitCall(RexCall call) {
-            call = (RexCall) super.visitCall(call);
-
-            // Not a coalesce invocation, skip it
-            if (!operatorIsCoalesce(call.getOperator())) {
-                return call;
-            }
-
-            final int firstNonNullableArgIndex = 
getFirstNonNullableArgumentIndex(call);
-
-            // If it's the first argument, just return the argument without 
the coalesce invocation
-            if (firstNonNullableArgIndex == 0) {
-                RexNode operand = call.operands.get(0);
-                if (call.getType().equals(operand.getType())) {
-                    return operand;
-                } else {
-                    return rexBuilder.makeCast(call.getType(), operand);
-                }
-            }
-
-            // If it's the last argument, or no non-null argument was found, 
return the original
-            // call
-            if (firstNonNullableArgIndex == call.operands.size() - 1
-                    || firstNonNullableArgIndex == -1) {
-                return call;
-            }
-
-            // Return the coalesce invocation with a trimmed argument list
-            final List<RexNode> trimmedOperandsList =
-                    call.operands.subList(0, firstNonNullableArgIndex + 1);
-            return call.clone(call.getType(), trimmedOperandsList);
-        }
-
-        private int getFirstNonNullableArgumentIndex(RexCall call) {
-            for (int argIndex = 0; argIndex < call.operands.size(); 
argIndex++) {
-                if (!call.operands.get(argIndex).getType().isNullable()) {
-                    return argIndex;
-                }
-            }
-            return -1;
-        }
-    }
-
-    private static boolean hasCoalesceInvocation(RexNode node) {
-        return FlinkRexUtil.hasOperatorCallMatching(
-                node, 
RemoveUnreachableCoalesceArgumentsRule::operatorIsCoalesce);
-    }
-
-    private static boolean operatorIsCoalesce(SqlOperator op) {
-        return op instanceof BridgingSqlFunction
-                && ((BridgingSqlFunction) op)
-                        .getDefinition()
-                        .equals(BuiltInFunctionDefinitions.COALESCE);
-    }
-
-    // 
---------------------------------------------------------------------------------------------
-
-    /** Configuration for {@link RemoveUnreachableCoalesceArgumentsRule}. */
-    @Value.Immutable(singleton = false)
-    public interface Config extends RelRule.Config {
-
-        Config DEFAULT =
-                
ImmutableRemoveUnreachableCoalesceArgumentsRule.Config.builder()
-                        .build()
-                        .as(Config.class);
-
-        @Override
-        default RemoveUnreachableCoalesceArgumentsRule toRule() {
-            return new RemoveUnreachableCoalesceArgumentsRule(this);
-        }
-
-        default Config withProject() {
-            Predicate<Project> projectPredicate =
-                    lp ->
-                            lp.getProjects().stream()
-                                    .anyMatch(
-                                            
RemoveUnreachableCoalesceArgumentsRule
-                                                    ::hasCoalesceInvocation);
-            final RelRule.OperandTransform projectTransform =
-                    operandBuilder ->
-                            operandBuilder
-                                    .operand(Project.class)
-                                    .predicate(projectPredicate)
-                                    .anyInputs();
-
-            return withOperandSupplier(projectTransform).as(Config.class);
-        }
-
-        default Config withFilter() {
-            Predicate<Filter> filterPredicate =
-                    lf ->
-                            
RemoveUnreachableCoalesceArgumentsRule.hasCoalesceInvocation(
-                                    lf.getCondition());
-            final RelRule.OperandTransform filterTransform =
-                    operandBuilder ->
-                            operandBuilder
-                                    .operand(Filter.class)
-                                    .predicate(filterPredicate)
-                                    .anyInputs();
-
-            return withOperandSupplier(filterTransform).as(Config.class);
-        }
-
-        default Config withJoin() {
-            Predicate<Join> joinPredicate =
-                    lj ->
-                            
RemoveUnreachableCoalesceArgumentsRule.hasCoalesceInvocation(
-                                    lj.getCondition());
-            final RelRule.OperandTransform joinTransform =
-                    operandBuilder ->
-                            
operandBuilder.operand(Join.class).predicate(joinPredicate).anyInputs();
-
-            return withOperandSupplier(joinTransform).as(Config.class);
-        }
-
-        default Config withCalc() {
-            Predicate<Calc> calcPredicate =
-                    lc ->
-                            lc.getProgram().getExprList().stream()
-                                    .anyMatch(
-                                            
RemoveUnreachableCoalesceArgumentsRule
-                                                    ::hasCoalesceInvocation);
-            final RelRule.OperandTransform joinTransform =
-                    operandBuilder ->
-                            
operandBuilder.operand(Calc.class).predicate(calcPredicate).anyInputs();
-
-            return withOperandSupplier(joinTransform).as(Config.class);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 0fc9db1e1c1..f07bf80bc05 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -772,6 +772,9 @@ class ExprCodeGenerator(
       case CASE =>
         generateIfElse(ctx, operands, resultType)
 
+      case COALESCE =>
+        generateCoalesce(ctx, operands, resultType)
+
       case IS_TRUE =>
         val operand = operands.head
         requireBoolean(operand)
@@ -955,6 +958,9 @@ class ExprCodeGenerator(
           case BuiltInFunctionDefinitions.REGEXP_REPLACE =>
             StringCallGen.generateRegexpReplace(ctx, operands, resultType)
 
+          case BuiltInFunctionDefinitions.COALESCE =>
+            generateCoalesce(ctx, operands, resultType)
+
           case _ =>
             new BridgingSqlFunctionCallGen(call, rexProgram).generate(ctx, 
operands, resultType)
         }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 28c0e06ca97..4e3a79cf7a2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.api.{TableRuntimeException, ValidationException}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
+import org.apache.flink.table.data.binary.BinaryArrayData
 import org.apache.flink.table.data.util.MapDataUtil
 import org.apache.flink.table.data.utils.CastExecutor
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
@@ -40,7 +40,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes, 
getPrecision, getScale}
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeMerging.findCommonType
-import org.apache.flink.table.utils.DateTimeUtils.{MILLIS_PER_DAY, 
MILLIS_PER_SECOND}
+import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.ColumnList
 import org.apache.flink.util.Preconditions.checkArgument
@@ -1034,6 +1034,47 @@ object ScalarOperatorGens {
     }
   }
 
+  def generateCoalesce(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      resultType: LogicalType): GeneratedExpression = {
+    if (operands.size == 1) {
+      generateCast(ctx, operands.head, resultType, nullOnFailure = false)
+    } else {
+      val condition =
+        if (operands.head.resultType.equals(resultType)) {
+          operands.head
+        } else {
+          generateCast(ctx, operands.head, resultType, nullOnFailure = false)
+        }
+      val falseAction = generateCoalesce(ctx, operands.tail, resultType)
+
+      val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+      val resultTypeTerm = primitiveTypeTermForType(resultType)
+      val defaultValue = primitiveDefaultValue(resultType)
+
+      val operatorCode =
+        s"""
+           |$resultTypeTerm $resultTerm = $defaultValue;
+           |// coalesce
+           |${condition.code}
+           |boolean $nullTerm = ${condition.nullTerm};
+           |if (!$nullTerm) {
+           |   $resultTerm = ${condition.resultTerm};
+           |} else {
+           |  ${falseAction.code}
+           |  $nullTerm = ${falseAction.nullTerm};
+           |  if (!$nullTerm) {
+           |    $resultTerm = ${falseAction.resultTerm};
+           |  }
+           |}
+           |// end coalesce
+           |""".stripMargin.trim
+
+      GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+    }
+  }
+
   def generateIfElse(
       ctx: CodeGeneratorContext,
       operands: Seq[GeneratedExpression],
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 7ff8a16a70e..f102f593ac3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.rules
 
 import org.apache.flink.table.planner.plan.nodes.logical._
+import 
org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets.SIMPLIFY_COALESCE_RULES
 import org.apache.flink.table.planner.plan.rules.logical._
 import 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
 import org.apache.flink.table.planner.plan.rules.physical.batch._
@@ -78,10 +79,6 @@ object FlinkBatchRuleSets {
 
   /** RuleSet to simplify coalesce invocations */
   private val SIMPLIFY_COALESCE_RULES: RuleSet = RuleSets.ofList(
-    RemoveUnreachableCoalesceArgumentsRule.PROJECT_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.FILTER_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.JOIN_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.CALC_INSTANCE,
     SimplifyCoalesceWithEquiJoinConditionRule.PROJECT_INSTANCE,
     SimplifyCoalesceWithEquiJoinConditionRule.CALC_INSTANCE
   )
@@ -117,11 +114,20 @@ object FlinkBatchRuleSets {
         // let project transpose window operator.
         CoreRules.PROJECT_WINDOW_TRANSPOSE,
         // ensure union set operator have the same row type
-        new CoerceInputsRule(classOf[LogicalUnion], false),
+        CoerceInputsRule.Config.DEFAULT
+          .withCoerceNames(false)
+          .withConsumerRelClass(classOf[LogicalUnion])
+          .toRule,
         // ensure intersect set operator have the same row type
-        new CoerceInputsRule(classOf[LogicalIntersect], false),
+        CoerceInputsRule.Config.DEFAULT
+          .withCoerceNames(false)
+          .withConsumerRelClass(classOf[LogicalIntersect])
+          .toRule,
         // ensure except set operator have the same row type
-        new CoerceInputsRule(classOf[LogicalMinus], false),
+        CoerceInputsRule.Config.DEFAULT
+          .withCoerceNames(false)
+          .withConsumerRelClass(classOf[LogicalMinus])
+          .toRule,
         ConvertToNotInOrInRule.INSTANCE,
         // optimize limit 0
         PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 05de12a6950..25ad791f53c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -79,12 +79,7 @@ object FlinkStreamRuleSets {
     CoreRules.JOIN_REDUCE_EXPRESSIONS
   )
 
-  /** RuleSet to simplify coalesce invocations */
   private val SIMPLIFY_COALESCE_RULES: RuleSet = RuleSets.ofList(
-    RemoveUnreachableCoalesceArgumentsRule.PROJECT_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.FILTER_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.JOIN_INSTANCE,
-    RemoveUnreachableCoalesceArgumentsRule.CALC_INSTANCE,
     SimplifyCoalesceWithEquiJoinConditionRule.PROJECT_INSTANCE,
     SimplifyCoalesceWithEquiJoinConditionRule.CALC_INSTANCE
   )
@@ -117,11 +112,20 @@ object FlinkStreamRuleSets {
           WindowPropertiesRules.WINDOW_PROPERTIES_RULE,
           WindowPropertiesRules.WINDOW_PROPERTIES_HAVING_RULE,
           // ensure union set operator have the same row type
-          new CoerceInputsRule(classOf[LogicalUnion], false),
+          CoerceInputsRule.Config.DEFAULT
+            .withCoerceNames(false)
+            .withConsumerRelClass(classOf[LogicalUnion])
+            .toRule,
           // ensure intersect set operator have the same row type
-          new CoerceInputsRule(classOf[LogicalIntersect], false),
+          CoerceInputsRule.Config.DEFAULT
+            .withCoerceNames(false)
+            .withConsumerRelClass(classOf[LogicalIntersect])
+            .toRule,
           // ensure except set operator have the same row type
-          new CoerceInputsRule(classOf[LogicalMinus], false),
+          CoerceInputsRule.Config.DEFAULT
+            .withCoerceNames(false)
+            .withConsumerRelClass(classOf[LogicalMinus])
+            .toRule,
           ConvertToNotInOrInRule.INSTANCE,
           // optimize limit 0
           PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
index 1a212141d14..ec64f2983ca 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CoalesceFunctionITCase.java
@@ -19,21 +19,311 @@
 package org.apache.flink.table.planner.functions;
 
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
 
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.api.DataTypes.ARRAY;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
 import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
 import static org.apache.flink.table.api.Expressions.coalesce;
+import static org.apache.flink.util.CollectionUtil.entry;
+import static org.apache.flink.util.CollectionUtil.map;
 
 /** Test {@link BuiltInFunctionDefinitions#COALESCE} and its return type. */
 class CoalesceFunctionITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
-        return Stream.of(
-                TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE)
+        final List<TestSetSpec> specs = new ArrayList<>();
+        specs.addAll(allTypesBasic());
+        specs.addAll(typePromotion());
+        specs.addAll(lazyEvaluation());
+        specs.addAll(constants());
+        return specs.stream();
+    }
+
+    private static List<TestSetSpec> allTypesBasic() {
+        return List.of(
+                basicSpec("BOOLEAN", BOOLEAN(), true, false),
+                basicSpec("TINYINT", TINYINT(), (byte) 5, (byte) 10),
+                basicSpec("SMALLINT", SMALLINT(), (short) 100, (short) 200),
+                basicSpec("INT", INT(), 1, 2),
+                basicSpec("BIGINT", BIGINT(), 100L, 200L),
+                basicSpec("FLOAT", FLOAT(), 1.5f, 2.5f),
+                basicSpec("DOUBLE", DOUBLE(), 1.5d, 2.5d),
+                basicSpec(
+                        "DECIMAL",
+                        DECIMAL(5, 2),
+                        new BigDecimal("123.45"),
+                        new BigDecimal("234.56")),
+                basicSpec("CHAR", CHAR(5), "hello", "world"),
+                basicSpec("VARCHAR", VARCHAR(10), "hello", "world"),
+                basicSpec("STRING", STRING(), "hello", "world"),
+                basicSpec("BINARY", BINARY(2), new byte[] {0, 1}, new byte[] 
{2, 3}),
+                basicSpec("VARBINARY", VARBINARY(5), new byte[] {0, 1, 2}, new 
byte[] {3, 4}),
+                basicSpec("BYTES", BYTES(), new byte[] {0, 1, 2}, new byte[] 
{3, 4, 5}),
+                basicSpec("DATE", DATE(), LocalDate.of(2026, 1, 1), 
LocalDate.of(2026, 12, 31)),
+                basicSpec("TIME", TIME(), LocalTime.of(12, 34, 56), 
LocalTime.of(23, 59, 59)),
+                basicSpec(
+                        "TIMESTAMP",
+                        TIMESTAMP(),
+                        LocalDateTime.of(2026, 1, 1, 12, 0, 0),
+                        LocalDateTime.of(2026, 12, 31, 23, 59, 59)),
+                basicSpec(
+                        "TIMESTAMP_LTZ",
+                        TIMESTAMP_LTZ(),
+                        Instant.parse("2026-01-01T12:00:00Z"),
+                        Instant.parse("2026-12-31T23:59:59Z")),
+                basicSpec(
+                        "INTERVAL_MONTH",
+                        INTERVAL(MONTH()),
+                        Period.ofMonths(18),
+                        Period.ofMonths(27)),
+                basicSpec(
+                        "INTERVAL_SECOND",
+                        INTERVAL(SECOND(3)),
+                        Duration.ofMillis(12345),
+                        Duration.ofMillis(67890)),
+                basicSpec("ARRAY", ARRAY(INT()), new Integer[] {1, 2, 3}, new 
Integer[] {4, 5, 6}),
+                basicSpec(
+                        "MAP",
+                        MAP(STRING(), INT()),
+                        map(entry("a", 1), entry("b", 2)),
+                        map(entry("c", 3), entry("d", 4))),
+                rowSpec());
+    }
+
+    private static TestSetSpec basicSpec(String name, DataType type, Object 
value1, Object value2) {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE, 
name)
+                .onFieldsWithData(null, value1, value2)
+                .andDataTypes(type.nullable(), type.notNull(), type.notNull())
+                .testResult(coalesce($("f0"), $("f1")), "COALESCE(f0, f1)", 
value1, type.notNull())
+                .testResult(coalesce($("f1"), $("f2")), "COALESCE(f1, f2)", 
value1, type.notNull())
+                .testResult(
+                        coalesce($("f0"), $("f0"), $("f2")),
+                        "COALESCE(f0, f0, f2)",
+                        value2,
+                        type.notNull());
+    }
+
+    private static TestSetSpec rowSpec() {
+        DataType rowType = ROW(FIELD("a", INT()), FIELD("b", STRING()));
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE, 
"ROW")
+                .onFieldsWithData(null, Row.of(1, "hello"), Row.of(2, "world"))
+                .andDataTypes(rowType.nullable(), rowType.notNull(), 
rowType.notNull())
+                .testResult(
+                        coalesce($("f0"), $("f1")),
+                        "COALESCE(f0, f1)",
+                        Row.of(1, "hello"),
+                        rowType.notNull())
+                .testResult(
+                        coalesce($("f1"), $("f2")),
+                        "COALESCE(f1, f2)",
+                        Row.of(1, "hello"),
+                        rowType.notNull());
+    }
+
+    /**
+     * Verifies the LEAST_RESTRICTIVE return-type inference combined with 
LEAST_NULLABLE: mixing
+     * compatible operand types yields the widest type, and nullability is 
dropped if any operand is
+     * NOT NULL. Also for reference look at {@link LogicalTypeMerging}.
+     */
+    private static List<TestSetSpec> typePromotion() {
+        return List.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE, 
"INT and BIGINT")
+                        .onFieldsWithData(null, 1, 2L)
+                        .andDataTypes(INT().nullable(), INT().nullable(), 
BIGINT().notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                1L,
+                                BIGINT().notNull()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.COALESCE, 
"TINYINT and INT")
+                        .onFieldsWithData(null, (byte) 7, 42)
+                        .andDataTypes(TINYINT().nullable(), 
TINYINT().nullable(), INT().notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                7,
+                                INT().notNull()),
+
+                // TIMESTAMP precision widening: TIMESTAMP(0) < TIMESTAMP(3) → 
declared
+                // TIMESTAMP(3). Calcite stores TIMESTAMP as a Long millis 
value, so widening
+                // precision does not change the underlying value.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "TIMESTAMP(0) and TIMESTAMP(3)")
+                        .onFieldsWithData(
+                                null,
+                                LocalDateTime.parse("2026-01-01T00:00:00"),
+                                LocalDateTime.parse("2026-01-01T00:00:00.123"))
+                        .andDataTypes(
+                                TIMESTAMP(0).nullable(),
+                                TIMESTAMP(0).nullable(),
+                                TIMESTAMP(3).notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                LocalDateTime.parse("2026-01-01T00:00:00"),
+                                TIMESTAMP(3).notNull()),
+
+                // DECIMAL precision widening, same scale: DECIMAL(5,2) < 
DECIMAL(10,2)
+                //   → declared DECIMAL(10, 2).
+                // Same scale → underlying BigDecimal representation pre/post 
simplify is
+                // identical.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "DECIMAL(5,2) and DECIMAL(10,2) (same scale)")
+                        .onFieldsWithData(
+                                null, new BigDecimal("1.23"), new 
BigDecimal("9876543.21"))
+                        .andDataTypes(
+                                DECIMAL(5, 2).nullable(),
+                                DECIMAL(5, 2).nullable(),
+                                DECIMAL(10, 2).notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                new BigDecimal("1.23"),
+                                DECIMAL(10, 2).notNull()),
+
+                // Based on LogicalTypeMerging#createCommonExactNumericType
+                // d = max(p1 - s1, p2 - s2)
+                // s <= max(s1, s2)
+                // p = s + d
+                // EXAMPLE: DECIMAL precision and scale widening: DECIMAL(5,2) 
< DECIMAL(10,4)
+                //   d = max(p1-s1, p2-s2) = max(3, 6) = 6,
+                //   scale = max(2, 4) = 4,
+                //   p = 10.
+                // Thus DECIMAL(10, 4).
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "DECIMAL(5,2) and DECIMAL(10,4) (different 
scale)")
+                        .onFieldsWithData(null, new BigDecimal("1.23"), new 
BigDecimal("4.5678"))
+                        .andDataTypes(
+                                DECIMAL(5, 2).nullable(),
+                                DECIMAL(5, 2).nullable(),
+                                DECIMAL(10, 4).notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                new BigDecimal("1.2300"),
+                                DECIMAL(10, 4).notNull()),
+
+                // INTERVAL YEAR TO MONTH — same shape on every operand. 
Stored as a single int
+                // (months); precision is metadata, the underlying value is 
unchanged.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE, "INTERVAL 
MONTH precision")
+                        .onFieldsWithData(null, Period.ofMonths(2), 
Period.ofMonths(5))
+                        .andDataTypes(
+                                INTERVAL(MONTH()).nullable(),
+                                INTERVAL(MONTH()).nullable(),
+                                INTERVAL(MONTH()).notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                Period.ofMonths(2),
+                                INTERVAL(MONTH()).notNull()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "INTERVAL SECOND same precision")
+                        .onFieldsWithData(null, Duration.ofSeconds(10), 
Duration.ofMillis(15000))
+                        .andDataTypes(
+                                INTERVAL(SECOND(3)).nullable(),
+                                INTERVAL(SECOND(3)).nullable(),
+                                INTERVAL(SECOND(3)).notNull())
+                        .testResult(
+                                coalesce($("f0"), $("f1"), $("f2")),
+                                "COALESCE(f0, f1, f2)",
+                                Duration.ofSeconds(10),
+                                INTERVAL(SECOND(3)).notNull()));
+    }
+
+    /** Lazy evaluation: a non-null operand short-circuits the rest. */
+    private static List<TestSetSpec> lazyEvaluation() {
+        return List.of(
+                // First arg non-null at runtime: subsequent ThrowingFunction 
must NOT be called.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "lazy: first operand non-null skips remainder")
+                        .onFieldsWithData(1, 100)
+                        .andDataTypes(INT().nullable(), INT().notNull())
+                        .withFunction(ThrowingFunction.class)
+                        .testResult(
+                                coalesce($("f0"), call("ThrowingFunction", 
$("f1"))),
+                                "COALESCE(f0, ThrowingFunction(f1))",
+                                1,
+                                INT().notNull()),
+                // Middle arg non-null at runtime: ThrowingFunction in the 
third slot must NOT be
+                // called.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "lazy: middle operand non-null skips 
remainder")
+                        .onFieldsWithData(null, 5, 100)
+                        .andDataTypes(INT().nullable(), INT().nullable(), 
INT().notNull())
+                        .withFunction(ThrowingFunction.class)
+                        .testResult(
+                                coalesce($("f0"), $("f1"), 
call("ThrowingFunction", $("f2"))),
+                                "COALESCE(f0, f1, ThrowingFunction(f2))",
+                                5,
+                                INT().notNull()),
+                // Negative control: the previous operand IS null at runtime, 
so ThrowingFunction
+                // must be reached and must throw.
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "negative control: throwing UDF fires when 
reached")
+                        .onFieldsWithData(null, 100)
+                        .andDataTypes(INT().nullable(), INT().notNull())
+                        .withFunction(ThrowingFunction.class)
+                        .testTableApiRuntimeError(
+                                coalesce($("f0"), call("ThrowingFunction", 
$("f1"))),
+                                "ThrowingFunction was called")
+                        .testSqlRuntimeError(
+                                "COALESCE(f0, ThrowingFunction(f1))",
+                                "ThrowingFunction was called"));
+    }
+
+    private static List<TestSetSpec> constants() {
+        return Collections.singletonList(
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.COALESCE,
+                                "constants and nullability inference")
                         .onFieldsWithData(null, null, 1)
                         .andDataTypes(BIGINT().nullable(), INT().nullable(), 
INT().notNull())
                         .testResult(
@@ -56,4 +346,11 @@ class CoalesceFunctionITCase extends 
BuiltInFunctionTestBase {
                                 // constant in the function invocation
                                 BIGINT().notNull()));
     }
+
+    /** Function that throws on every invocation. */
+    public static class ThrowingFunction extends ScalarFunction {
+        public int eval(int i) {
+            throw new RuntimeException("ThrowingFunction was called");
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index 0a92bc0f41a..940c90212e4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 
+import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDateTime;
 
@@ -300,4 +301,44 @@ public class CalcTestPrograms {
                                     + "FROM orders a LEFT JOIN 
order_details_row b "
                                     + "ON a.order_id = b.r.order_id")
                     .build();
+
+    public static final TableTestProgram COALESCE =
+            TableTestProgram.of("calc-coalesce", "validates coalesce node")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "a DECIMAL(2, 1)",
+                                            "b DECIMAL(4, 2)",
+                                            "c TIMESTAMP(0)",
+                                            "d TIMESTAMP(3)")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    null,
+                                                    new BigDecimal("11.22"),
+                                                    null,
+                                                    LocalDateTime.of(
+                                                            1970, 1, 1, 0, 0, 
0, 123_000_000)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    new BigDecimal("5.3"),
+                                                    null,
+                                                    LocalDateTime.of(2000, 2, 
2, 2, 2, 2),
+                                                    null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("x DECIMAL(4, 2)", "y 
TIMESTAMP(3)")
+                                    .consumedBeforeRestore(
+                                            Row.of(
+                                                    new BigDecimal("11.22"),
+                                                    LocalDateTime.of(
+                                                            1970, 1, 1, 0, 0, 
0, 123_000_000)))
+                                    .consumedAfterRestore(
+                                            Row.of(
+                                                    new BigDecimal("5.30"),
+                                                    LocalDateTime.of(2000, 2, 
2, 2, 2, 2)))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT COALESCE(a, b) AS x, 
COALESCE(c, d) AS y FROM t")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 8de2e05e054..e26f2aa1864 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -42,6 +42,7 @@ public class CalcRestoreTest extends RestoreTestBase {
                 CalcTestPrograms.CALC_SARG,
                 CalcTestPrograms.CALC_UDF_SIMPLE,
                 CalcTestPrograms.CALC_UDF_COMPLEX,
-                CalcTestPrograms.CALC_CURRENT_TIMESTAMP);
+                CalcTestPrograms.CALC_CURRENT_TIMESTAMP,
+                CalcTestPrograms.COALESCE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
index 3610b20d6f3..397b117fa16 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
@@ -30,7 +30,10 @@ import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.table.api.DataTypes.STRING;
 
-/** Test rule {@link RemoveUnreachableCoalesceArgumentsRule}. */
+/**
+ * Tests that {@code COALESCE} arguments appearing after the first {@code NOT 
NULL} argument are
+ * pruned from the plan, since they can never be reached.
+ */
 class RemoveUnreachableCoalesceArgumentsRuleTest extends TableTestBase {
 
     private StreamTableTestUtil util;
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
index 4ec32df7293..ae9e1459285 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/GroupingSetsTest.xml
@@ -749,14 +749,14 @@ GROUP BY GROUPING SETS ((a, b), (a, b, c))
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], EXPR$2=[coalesce($2, _UTF-16LE'empty')], 
EXPR$3=[$3])
+LogicalProject(a=[$0], b=[$1], EXPR$2=[COALESCE($2, _UTF-16LE'empty')], 
EXPR$3=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}]], 
EXPR$3=[AVG($3)])
    +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, coalesce(c, 'empty') AS EXPR$2, EXPR$3])
+Calc(select=[a, b, COALESCE(c, 'empty') AS EXPR$2, EXPR$3])
 +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e, 
Final_AVG(sum$0, count$1) AS EXPR$3])
    +- Exchange(distribution=[hash[a, b, c, $e]])
       +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, 
Partial_AVG(d) AS (sum$0, count$1)])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
index 38eb87d03bd..1003f2fdbef 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
@@ -39,7 +39,7 @@ Calc(select=[COALESCE(f0, f2) AS EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[COALESCE($1, _UTF-16LE'-')])
+LogicalProject(EXPR$0=[$1])
 +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
@@ -56,7 +56,7 @@ Calc(select=[f1 AS EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[COALESCE($0, $1, _UTF-16LE'-')])
+LogicalProject(EXPR$0=[COALESCE($0, $1)])
 +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
@@ -94,7 +94,7 @@ Calc(select=[f0, f1, f2], where=[=(COALESCE(f0, f1), 'abc')])
 LogicalProject(f0=[$0], f1=[$1], f2=[$2], f00=[$3], f10=[$4], f20=[$5])
 +- LogicalProject(f0=[$0], f1=[$1], f2=[$2], f00=[$4], f10=[$5], f20=[$6])
    +- LogicalJoin(condition=[=($3, $4)], joinType=[left])
-      :- LogicalProject(f0=[$0], f1=[$1], f2=[$2], $f3=[COALESCE($0, 
_UTF-16LE'-', $2)])
+      :- LogicalProject(f0=[$0], f1=[$1], f2=[$2], $f3=[COALESCE($0, 
_UTF-16LE'-')])
       :  +- LogicalTableScan(table=[[default_catalog, default_database, T]])
       +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
@@ -134,13 +134,13 @@ COALESCE(cast(NULL as double), cast(NULL as double))]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[COALESCE(1)], EXPR$1=[COALESCE(1, 2)], 
EXPR$2=[COALESCE(null:INTEGER, 2)], EXPR$3=[COALESCE(1, null:INTEGER)], 
EXPR$4=[COALESCE(null:INTEGER, null:INTEGER, 3)], EXPR$5=[COALESCE(4, 
null:INTEGER, null:INTEGER, null:INTEGER)], EXPR$6=[COALESCE(_UTF-16LE'1')], 
EXPR$7=[COALESCE(_UTF-16LE'1', _UTF-16LE'23')], 
EXPR$8=[COALESCE(null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
_UTF-16LE'2')], EXPR$9=[COALESCE(_UTF-16LE'1', null:VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE [...]
+LogicalProject(EXPR$0=[1], EXPR$1=[1], EXPR$2=[2], EXPR$3=[1], EXPR$4=[3], 
EXPR$5=[4], EXPR$6=[_UTF-16LE'1'], EXPR$7=[_UTF-16LE'1':VARCHAR(2) CHARACTER 
SET "UTF-16LE"], EXPR$8=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"], EXPR$9=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"], EXPR$10=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"], EXPR$11=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"], EXPR$12=[1.0:DECIMAL(2, 1)], EXPR$13=[1.0:DECIMAL [...]
 +- LogicalValues(tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4, 
4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS 
EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2E0 AS EXPR$14, 2E0 AS 
EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
+Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4, 
4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS 
EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2.0 AS EXPR$14, 2.0 AS 
EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
 +- Values(tuples=[[{ 0 }]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
index e2b46196f6c..ea49ada9e0c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyCoalesceWithEquiJoinConditionRuleTest.xml
@@ -45,7 +45,7 @@ Calc(select=[COALESCE(order_id0, order_id) AS order_id])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(order_id=[COALESCE($3, $0)])
+LogicalProject(order_id=[$3])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -153,7 +153,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
order_details, projec
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(order_id=[COALESCE($0, $3)], amount=[$2])
+LogicalProject(order_id=[$0], amount=[$2])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -171,7 +171,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
orders, project=[orde
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(val=[COALESCE($3, $0, $1)])
+LogicalProject(val=[COALESCE($3, $0)])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -189,7 +189,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
orders, project=[orde
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(val=[COALESCE($1, $3, $0)])
+LogicalProject(val=[$1])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -212,7 +212,7 @@ Calc(select=[user_id AS val])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(val=[COALESCE($1, $0, $3)])
+LogicalProject(val=[$1])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -230,7 +230,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
orders, project=[user
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(val=[COALESCE($3, $1, $0)])
+LogicalProject(val=[COALESCE($3, $1)])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
@@ -253,7 +253,7 @@ Calc(select=[COALESCE(order_id0, user_id) AS val])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(order_id=[COALESCE($3, $0, 0:BIGINT)])
+LogicalProject(order_id=[COALESCE($3, $0)])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
order_details]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 393b486d2f2..11b409c98fe 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -109,14 +109,14 @@ Calc(select=[a], where=[>(random_udf(b), 10)])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
+LogicalProject(field2=[$1], transactionId=[$0.data.nested.trId])
 +- LogicalProject(field1=[$0], field2=[$0])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameType]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS 
transactionId])
+Calc(select=[field1 AS field2, field1.data.nested.trId AS transactionId])
 +- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameType]], fields=[field1])
 ]]>
     </Resource>
@@ -127,14 +127,14 @@ Calc(select=[field1 AS field2, 
COALESCE(field1.data.nested.trId) AS transactionI
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE(ITEM($0.data, 
0).nested.trId)])
+LogicalProject(field2=[$1], transactionId=[ITEM($0.data, 0).nested.trId])
 +- LogicalProject(field1=[$0], field2=[$0])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithArray]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[field1 AS field2, COALESCE(ITEM(field1.data, 0).nested.trId) AS 
transactionId])
+Calc(select=[field1 AS field2, ITEM(field1.data, 0).nested.trId AS 
transactionId])
 +- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithArray]], fields=[field1])
 ]]>
     </Resource>
@@ -145,14 +145,14 @@ Calc(select=[field1 AS field2, COALESCE(ITEM(field1.data, 
0).nested.trId) AS tra
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
+LogicalProject(field2=[$1], transactionId=[$0.data.nested.trId])
 +- LogicalProject(field1=[$0], field2=[$0])
    +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithNullableNestedType]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS 
transactionId])
+Calc(select=[field1 AS field2, field1.data.nested.trId AS transactionId])
 +- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithNullableNestedType]], fields=[field1])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
index 88e69acd291..a5a504566af 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.xml
@@ -717,14 +717,14 @@ GROUP BY GROUPING SETS ((a, b), (a, b, c))
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], EXPR$2=[coalesce($2, _UTF-16LE'empty')], 
EXPR$3=[$3])
+LogicalProject(a=[$0], b=[$1], EXPR$2=[COALESCE($2, _UTF-16LE'empty')], 
EXPR$3=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}]], 
EXPR$3=[AVG($3)])
    +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, coalesce(c, 'empty') AS EXPR$2, EXPR$3])
+Calc(select=[a, b, COALESCE(c, 'empty') AS EXPR$2, EXPR$3])
 +- GroupAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, AVG(d) AS 
EXPR$3])
    +- Exchange(distribution=[hash[a, b, c, $e]])
       +- Expand(projects=[{a, b, c, d, 0 AS $e}, {a, b, null AS c, d, 1 AS 
$e}])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 71a9bd5df9a..ea2088d064e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2102,7 +2102,7 @@ GROUP BY a, ws, we
       <![CDATA[
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
 +- LogicalProject(a=[$0], ws=[$2], we=[$3])
-   +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)], we=[COALESCE($3, 
$7)], new_proctime=[PROCTIME()])
+   +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], 
new_proctime=[PROCTIME()])
       +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], joinType=[inner])
          :- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8])
          :  +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'proctime'), 300000:INTERVAL MINUTE)], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) 
*PROCTIME* window_time)])
@@ -3567,7 +3567,7 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
 +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6])
    +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'new_proctime'), 300000:INTERVAL MINUTE)], 
rowType=[RecordType(INTEGER a, BIGINT b, TIMESTAMP(3) ws, TIMESTAMP(3) we, 
TIMESTAMP_LTZ(3) *PROCTIME* new_proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
       +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], new_proctime=[$4])
-         +- LogicalProject(a=[$0], b=[$1], ws=[COALESCE($2, $6)], 
we=[COALESCE($3, $7)], new_proctime=[PROCTIME()])
+         +- LogicalProject(a=[$0], b=[$1], ws=[$2], we=[$3], 
new_proctime=[PROCTIME()])
             +- LogicalJoin(condition=[AND(=($2, $6), =($3, $7))], 
joinType=[inner])
                :- LogicalProject(a=[$0], b=[$1], window_start=[$7], 
window_end=[$8])
                :  +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'proctime'), 300000:INTERVAL MINUTE)], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* 
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) 
*PROCTIME* window_time)])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
new file mode 100644
index 00000000000..70f6bc7db5f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/plan/calc-coalesce.json
@@ -0,0 +1,135 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "DECIMAL(2, 1)"
+            }, {
+              "name" : "b",
+              "dataType" : "DECIMAL(4, 2)"
+            }, {
+              "name" : "c",
+              "dataType" : "TIMESTAMP(0)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` DECIMAL(2, 1), `b` DECIMAL(4, 2), `c` 
TIMESTAMP(0), `d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b, c, d])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$COALESCE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "DECIMAL(2, 1)"
+        } ],
+        "type" : "DECIMAL(4, 2)"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "DECIMAL(4, 2)"
+      } ],
+      "type" : "DECIMAL(4, 2)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$COALESCE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "TIMESTAMP(0)"
+        } ],
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` DECIMAL(4, 2), `y` TIMESTAMP(3)>",
+    "description" : "Calc(select=[COALESCE(CAST(a AS DECIMAL(4, 2)), b) AS x, 
COALESCE(CAST(c AS TIMESTAMP(3)), d) AS y])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "x",
+              "dataType" : "DECIMAL(4, 2)"
+            }, {
+              "name" : "y",
+              "dataType" : "TIMESTAMP(3)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` DECIMAL(4, 2), `y` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[x, y])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
new file mode 100644
index 00000000000..e5f4d56b179
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-coalesce/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
index 59da2c17b3a..d78082ac8f2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
@@ -1,8 +1,8 @@
 {
-  "flinkVersion" : "2.0",
+  "flinkVersion" : "2.4",
   "nodes" : [ {
-    "id" : 21,
-    "type" : "stream-exec-table-source-scan_1",
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
         "identifier" : "`default_catalog`.`default_database`.`t`",
@@ -11,18 +11,15 @@
             "columns" : [ {
               "name" : "a",
               "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
+            } ]
+          }
         }
       }
     },
     "outputType" : "ROW<`a` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a])",
-    "inputProperties" : [ ]
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a])"
   }, {
-    "id" : 22,
+    "id" : 2,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "CALL",
@@ -41,8 +38,8 @@
           }
         }, {
           "kind" : "CALL",
+          "syntax" : "FUNCTION_ID",
           "internalName" : "$CURRENT_TIMESTAMP$1",
-          "operands" : [ ],
           "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
         } ],
         "type" : "BIGINT NOT NULL"
@@ -64,8 +61,8 @@
     "outputType" : "ROW<`EXPR$0` BIGINT>",
     "description" : "Calc(select=[(EXTRACT(YEAR, CURRENT_TIMESTAMP()) / a) AS 
EXPR$0])"
   }, {
-    "id" : 23,
-    "type" : "stream-exec-sink_1",
+    "id" : 3,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -81,14 +78,13 @@
             "columns" : [ {
               "name" : "a",
               "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
+            } ]
+          }
         }
       }
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -100,15 +96,15 @@
     "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[EXPR$0])"
   } ],
   "edges" : [ {
-    "source" : 21,
-    "target" : 22,
+    "source" : 1,
+    "target" : 2,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 22,
-    "target" : 23,
+    "source" : 2,
+    "target" : 3,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
index 085b8798cf8..39bef7a5568 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
deleted file mode 100644
index 4f2253c7810..00000000000
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/CoalesceFunction.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
-
-import javax.annotation.Nullable;
-
-/** Implementation of {@link BuiltInFunctionDefinitions#COALESCE}. */
-@Internal
-public class CoalesceFunction extends BuiltInScalarFunction {
-
-    public CoalesceFunction(SpecializedContext context) {
-        super(BuiltInFunctionDefinitions.COALESCE, context);
-    }
-
-    public @Nullable Object eval(Object... args) {
-        for (Object arg : args) {
-            if (arg != null) {
-                return arg;
-            }
-        }
-        return null;
-    }
-}

Reply via email to