This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 798d06811 [FLINK-38996][transform] Enhance error messages for 
projection and filtering expressions (#4243)
798d06811 is described below

commit 798d068112deaefe2e5c35d5d04c220e37ddd39a
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 30 17:30:44 2026 +0800

    [FLINK-38996][transform] Enhance error messages for projection and 
filtering expressions (#4243)
---
 .../flink/FlinkPipelineTransformITCase.java        | 11 ++++---
 .../src/test/resources/specs/casting.yaml          |  2 +-
 .../src/test/resources/specs/logical.yaml          |  2 +-
 .../operators/transform/ProjectionColumn.java      |  4 +++
 .../transform/ProjectionColumnProcessor.java       |  5 ++-
 .../transform/TransformExpressionCompiler.java     |  6 +++-
 .../transform/TransformExpressionKey.java          | 37 +++++++++++++++++++---
 .../transform/TransformFilterProcessor.java        |  8 +++--
 .../transform/PostTransformOperatorTest.java       |  5 +++
 9 files changed, 65 insertions(+), 15 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index a10392775..c81b1ba46 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -2737,12 +2737,12 @@ class FlinkPipelineTransformITCase {
                 .cause()
                 .isExactlyInstanceOf(FlinkRuntimeException.class)
                 .hasMessage(
-                        "Failed to compile expression 
TransformExpressionKey{expression='"
+                        "Failed to compile expression 
TransformExpressionKey{originalExpression='id1 > 0', expression='"
                                 + JaninoCompiler.LOAD_MODULES_EXPRESSION
                                 + "greaterThan($0, 0)', 
argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class 
java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, 
columnNameMap={id1=$0}}")
                 .cause()
                 .hasMessageContaining(
-                        "Expression: "
+                        "Compiled expression: "
                                 + JaninoCompiler.LOAD_MODULES_EXPRESSION
                                 + "greaterThan($0, 0)")
                 .hasMessageContaining("Column name map: {$0 -> id1}")
@@ -2787,7 +2787,7 @@ class FlinkPipelineTransformITCase {
                 .cause()
                 .isExactlyInstanceOf(RuntimeException.class)
                 .hasMessageContaining(
-                        "Failed to evaluate projection expression 
`castToInteger($0) + 1` for column `new_name` in table 
`default_namespace.default_schema.mytable1`")
+                        "Failed to evaluate projection expression 
`CAST(`TB`.`name` AS INTEGER) + 1` for column `new_name` in table 
`default_namespace.default_schema.mytable1`")
                 .hasMessageContaining("Column name map: {$0 -> name}");
 
         // Unsupported operations in filter rule
@@ -2808,7 +2808,10 @@ class FlinkPipelineTransformITCase {
                 .cause()
                 .isExactlyInstanceOf(RuntimeException.class)
                 .hasMessageContaining(
-                        "Failed to evaluate filtering expression 
`greaterThan($0 + 1, 0)` for table `default_namespace.default_schema.mytable1`")
+                        "Failed to evaluate filtering expression for table 
`default_namespace.default_schema.mytable1`.\n"
+                                + "\tOriginal expression: name + 1 > 0\n"
+                                + "\tCompiled expression: greaterThan($0 + 1, 
0)\n"
+                                + "\tColumn name map: {$0 -> name}")
                 .hasMessageContaining("Column name map: {$0 -> name}")
                 .rootCause()
                 .isExactlyInstanceOf(RuntimeException.class)
diff --git a/flink-cdc-composer/src/test/resources/specs/casting.yaml 
b/flink-cdc-composer/src/test/resources/specs/casting.yaml
index 84a68e788..f5ce0e802 100644
--- a/flink-cdc-composer/src/test/resources/specs/casting.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/casting.yaml
@@ -268,4 +268,4 @@
     id_
     CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11
   primary-key: id_
-  expect-error: Failed to evaluate projection expression 
`castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table 
`foo.bar.baz`.
+  expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS 
TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`.
diff --git a/flink-cdc-composer/src/test/resources/specs/logical.yaml 
b/flink-cdc-composer/src/test/resources/specs/logical.yaml
index cc825f7a6..4f09aa823 100644
--- a/flink-cdc-composer/src/test/resources/specs/logical.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/logical.yaml
@@ -45,4 +45,4 @@
     bool_ IS FALSE AS comp_8
     bool_ IS NOT FALSE AS comp_9
   primary-key: id_
-  expect-error: 'java.lang.RuntimeException: Failed to evaluate projection 
expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.'
+  expect-error: 'java.lang.RuntimeException: Failed to evaluate projection 
expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.'
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
index 5c3b17a43..fdbaf2678 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
@@ -87,6 +87,10 @@ public class ProjectionColumn implements Serializable {
         return column.getType();
     }
 
+    public String getExpression() {
+        return expression;
+    }
+
     public String getScriptExpression() {
         return scriptExpression;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index cf62cc855..f2cfc4256 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -89,10 +89,12 @@ public class ProjectionColumnProcessor {
             throw new RuntimeException(
                     String.format(
                             "Failed to evaluate projection expression `%s` for 
column `%s` in table `%s`.\n"
+                                    + "\tCompiled expression: %s\n"
                                     + "\tColumn name map: {%s}",
-                            projectionColumn.getScriptExpression(),
+                            projectionColumn.getExpression(),
                             projectionColumn.getColumnName(),
                             tableInfo.getName(),
+                            projectionColumn.getScriptExpression(),
                             projectionColumn.getColumnNameMapAsString()),
                     e);
         }
@@ -168,6 +170,7 @@ public class ProjectionColumnProcessor {
         paramTypes.add(Long.class);
 
         return TransformExpressionKey.of(
+                projectionColumn.getExpression(),
                 JaninoCompiler.loadSystemFunction(scriptExpression),
                 argumentNames,
                 paramTypes,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index 4c6cb71e1..bc496faf9 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -77,7 +77,11 @@ public class TransformExpressionCompiler {
                         } catch (CompileException e) {
                             throw new InvalidProgramException(
                                     String.format(
-                                            "Expression cannot be compiled. 
This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map: 
{%s}",
+                                            "Expression cannot be compiled. 
This is a bug. Please file an issue.\n"
+                                                    + "\tOriginal expression: 
%s\n"
+                                                    + "\tCompiled expression: 
%s\n"
+                                                    + "\tColumn name map: 
{%s}",
+                                            key.getOriginalExpression(),
                                             key.getExpression(),
                                             
TransformException.prettyPrintColumnNameMap(
                                                     key.getColumnNameMap())),
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 6ece097fc..75cb13b66 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.runtime.operators.transform;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
@@ -29,7 +31,8 @@ import java.util.Objects;
  * <p>A transform expression key contains:
  *
  * <ul>
- *   <li>expression: a string for the transformation expression.
+ *   <li>originalExpression: a string for the original transformation 
expression input by users.
+ *   <li>expression: a string for the compiled transformation expression.
  *   <li>argumentNames: a list for the argument names in expression.
  *   <li>argumentClasses: a list for the argument classes in expression.
  *   <li>returnClass: a class for the return class in expression
@@ -39,6 +42,7 @@ import java.util.Objects;
  */
 public class TransformExpressionKey implements Serializable {
     private static final long serialVersionUID = 1L;
+    @Nullable private final String originalExpression;
     private final String expression;
     private final List<String> argumentNames;
     private final List<Class<?>> argumentClasses;
@@ -46,11 +50,13 @@ public class TransformExpressionKey implements Serializable 
{
     private final Map<String, String> columnNameMap;
 
     private TransformExpressionKey(
+            @Nullable String originalExpression,
             String expression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
             Class<?> returnClass,
             Map<String, String> columnNameMap) {
+        this.originalExpression = originalExpression;
         this.expression = expression;
         this.argumentNames = argumentNames;
         this.argumentClasses = argumentClasses;
@@ -58,6 +64,11 @@ public class TransformExpressionKey implements Serializable {
         this.columnNameMap = columnNameMap;
     }
 
+    @Nullable
+    public String getOriginalExpression() {
+        return originalExpression;
+    }
+
     public String getExpression() {
         return expression;
     }
@@ -79,13 +90,19 @@ public class TransformExpressionKey implements Serializable 
{
     }
 
     public static TransformExpressionKey of(
+            @Nullable String originalExpression,
             String expression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
             Class<?> returnClass,
             Map<String, String> columnNameMap) {
         return new TransformExpressionKey(
-                expression, argumentNames, argumentClasses, returnClass, 
columnNameMap);
+                originalExpression,
+                expression,
+                argumentNames,
+                argumentClasses,
+                returnClass,
+                columnNameMap);
     }
 
     @Override
@@ -97,7 +114,8 @@ public class TransformExpressionKey implements Serializable {
             return false;
         }
         TransformExpressionKey that = (TransformExpressionKey) o;
-        return expression.equals(that.expression)
+        return Objects.equals(originalExpression, that.originalExpression)
+                && expression.equals(that.expression)
                 && argumentNames.equals(that.argumentNames)
                 && argumentClasses.equals(that.argumentClasses)
                 && returnClass.equals(that.returnClass)
@@ -106,13 +124,22 @@ public class TransformExpressionKey implements 
Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(expression, argumentNames, argumentClasses, 
returnClass, columnNameMap);
+        return Objects.hash(
+                originalExpression,
+                expression,
+                argumentNames,
+                argumentClasses,
+                returnClass,
+                columnNameMap);
     }
 
     @Override
     public String toString() {
         return "TransformExpressionKey{"
-                + "expression='"
+                + "originalExpression='"
+                + originalExpression
+                + '\''
+                + ", expression='"
                 + expression
                 + '\''
                 + ", argumentNames="
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 77b01759e..299266d1b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -112,10 +112,13 @@ public class TransformFilterProcessor {
         } catch (InvocationTargetException e) {
             throw new RuntimeException(
                     String.format(
-                            "Failed to evaluate filtering expression `%s` for 
table `%s`.\n"
+                            "Failed to evaluate filtering expression for table 
`%s`.\n"
+                                    + "\tOriginal expression: %s\n"
+                                    + "\tCompiled expression: %s\n"
                                     + "\tColumn name map: {%s}",
-                            transformFilter.getScriptExpression(),
                             tableInfo.getName(),
+                            transformFilter.getExpression(),
+                            transformFilter.getScriptExpression(),
                             transformFilter.getColumnNameMapAsString()),
                     e);
         }
@@ -206,6 +209,7 @@ public class TransformFilterProcessor {
         args.f1.add(Long.class);
 
         return TransformExpressionKey.of(
+                transformFilter.getExpression(),
                 
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
                 args.f0,
                 args.f1,
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 20cae3a20..e50ed0ca7 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -2767,7 +2767,12 @@ class PostTransformOperatorTest {
                                 + "\tcolumns={`col1` STRING NOT NULL,`castInt` 
INT,`castBoolean` BOOLEAN,`castTinyint` TINYINT,`castSmallint` 
SMALLINT,`castBigint` BIGINT,`castFloat` FLOAT,`castDouble` DOUBLE,`castChar` 
STRING,`castVarchar` STRING,`castDecimal` DECIMAL(4, 2),`castTimestamp` 
TIMESTAMP(3)}, primaryKeys=col1, options=().")
                 .cause()
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Failed to evaluate projection expression 
`CAST(`TB`.`castFloat` AS TIMESTAMP(3))` for column `castTimestamp` in table 
`my_company.my_branch.data_cast`.\n"
+                                + "\tCompiled expression: castToTimestamp($0, 
__time_zone__)\n"
+                                + "\tColumn name map: {$0 -> castFloat}")
                 .hasRootCauseMessage("Unable to parse given string as 
timestamp: 1.0");
+
         transformFunctionEventEventOperatorTestHarness.close();
     }
 

Reply via email to