This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 798d06811 [FLINK-38996][transform] Enhance error messages for
projection and filtering expressions (#4243)
798d06811 is described below
commit 798d068112deaefe2e5c35d5d04c220e37ddd39a
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 30 17:30:44 2026 +0800
[FLINK-38996][transform] Enhance error messages for projection and
filtering expressions (#4243)
---
.../flink/FlinkPipelineTransformITCase.java | 11 ++++---
.../src/test/resources/specs/casting.yaml | 2 +-
.../src/test/resources/specs/logical.yaml | 2 +-
.../operators/transform/ProjectionColumn.java | 4 +++
.../transform/ProjectionColumnProcessor.java | 5 ++-
.../transform/TransformExpressionCompiler.java | 6 +++-
.../transform/TransformExpressionKey.java | 37 +++++++++++++++++++---
.../transform/TransformFilterProcessor.java | 8 +++--
.../transform/PostTransformOperatorTest.java | 5 +++
9 files changed, 65 insertions(+), 15 deletions(-)
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index a10392775..c81b1ba46 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -2737,12 +2737,12 @@ class FlinkPipelineTransformITCase {
.cause()
.isExactlyInstanceOf(FlinkRuntimeException.class)
.hasMessage(
- "Failed to compile expression
TransformExpressionKey{expression='"
+ "Failed to compile expression
TransformExpressionKey{originalExpression='id1 > 0', expression='"
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
+ "greaterThan($0, 0)',
argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class
java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean,
columnNameMap={id1=$0}}")
.cause()
.hasMessageContaining(
- "Expression: "
+ "Compiled expression: "
+ JaninoCompiler.LOAD_MODULES_EXPRESSION
+ "greaterThan($0, 0)")
.hasMessageContaining("Column name map: {$0 -> id1}")
@@ -2787,7 +2787,7 @@ class FlinkPipelineTransformITCase {
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining(
- "Failed to evaluate projection expression
`castToInteger($0) + 1` for column `new_name` in table
`default_namespace.default_schema.mytable1`")
+ "Failed to evaluate projection expression
`CAST(`TB`.`name` AS INTEGER) + 1` for column `new_name` in table
`default_namespace.default_schema.mytable1`")
.hasMessageContaining("Column name map: {$0 -> name}");
// Unsupported operations in filter rule
@@ -2808,7 +2808,10 @@ class FlinkPipelineTransformITCase {
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining(
- "Failed to evaluate filtering expression
`greaterThan($0 + 1, 0)` for table `default_namespace.default_schema.mytable1`")
+ "Failed to evaluate filtering expression for table
`default_namespace.default_schema.mytable1`.\n"
+ + "\tOriginal expression: name + 1 > 0\n"
+ + "\tCompiled expression: greaterThan($0 + 1,
0)\n"
+ + "\tColumn name map: {$0 -> name}")
.hasMessageContaining("Column name map: {$0 -> name}")
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
diff --git a/flink-cdc-composer/src/test/resources/specs/casting.yaml
b/flink-cdc-composer/src/test/resources/specs/casting.yaml
index 84a68e788..f5ce0e802 100644
--- a/flink-cdc-composer/src/test/resources/specs/casting.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/casting.yaml
@@ -268,4 +268,4 @@
id_
CAST('FOOBAR' AS TIMESTAMP(6)) AS comp_11
primary-key: id_
- expect-error: Failed to evaluate projection expression
`castToTimestamp("FOOBAR", __time_zone__)` for column `comp_11` in table
`foo.bar.baz`.
+ expect-error: Failed to evaluate projection expression `CAST('FOOBAR' AS
TIMESTAMP(6))` for column `comp_11` in table `foo.bar.baz`.
diff --git a/flink-cdc-composer/src/test/resources/specs/logical.yaml
b/flink-cdc-composer/src/test/resources/specs/logical.yaml
index cc825f7a6..4f09aa823 100644
--- a/flink-cdc-composer/src/test/resources/specs/logical.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/logical.yaml
@@ -45,4 +45,4 @@
bool_ IS FALSE AS comp_8
bool_ IS NOT FALSE AS comp_9
primary-key: id_
- expect-error: 'java.lang.RuntimeException: Failed to evaluate projection
expression `$0 || false` for column `comp_2` in table `foo.bar.baz`.'
+ expect-error: 'java.lang.RuntimeException: Failed to evaluate projection
expression ``TB`.`bool_` OR FALSE` for column `comp_2` in table `foo.bar.baz`.'
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
index 5c3b17a43..fdbaf2678 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
@@ -87,6 +87,10 @@ public class ProjectionColumn implements Serializable {
return column.getType();
}
+ public String getExpression() {
+ return expression;
+ }
+
public String getScriptExpression() {
return scriptExpression;
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index cf62cc855..f2cfc4256 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -89,10 +89,12 @@ public class ProjectionColumnProcessor {
throw new RuntimeException(
String.format(
"Failed to evaluate projection expression `%s` for
column `%s` in table `%s`.\n"
+ + "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
- projectionColumn.getScriptExpression(),
+ projectionColumn.getExpression(),
projectionColumn.getColumnName(),
tableInfo.getName(),
+ projectionColumn.getScriptExpression(),
projectionColumn.getColumnNameMapAsString()),
e);
}
@@ -168,6 +170,7 @@ public class ProjectionColumnProcessor {
paramTypes.add(Long.class);
return TransformExpressionKey.of(
+ projectionColumn.getExpression(),
JaninoCompiler.loadSystemFunction(scriptExpression),
argumentNames,
paramTypes,
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index 4c6cb71e1..bc496faf9 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -77,7 +77,11 @@ public class TransformExpressionCompiler {
} catch (CompileException e) {
throw new InvalidProgramException(
String.format(
- "Expression cannot be compiled.
This is a bug. Please file an issue.\n\tExpression: %s\n\tColumn name map:
{%s}",
+ "Expression cannot be compiled.
This is a bug. Please file an issue.\n"
+ + "\tOriginal expression:
%s\n"
+ + "\tCompiled expression:
%s\n"
+ + "\tColumn name map:
{%s}",
+ key.getOriginalExpression(),
key.getExpression(),
TransformException.prettyPrintColumnNameMap(
key.getColumnNameMap())),
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 6ece097fc..75cb13b66 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.runtime.operators.transform;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
@@ -29,7 +31,8 @@ import java.util.Objects;
* <p>A transform expression key contains:
*
* <ul>
- * <li>expression: a string for the transformation expression.
+ * <li>originalExpression: a string for the original transformation
expression input by users.
+ * <li>expression: a string for the compiled transformation expression.
* <li>argumentNames: a list for the argument names in expression.
* <li>argumentClasses: a list for the argument classes in expression.
* <li>returnClass: a class for the return class in expression
@@ -39,6 +42,7 @@ import java.util.Objects;
*/
public class TransformExpressionKey implements Serializable {
private static final long serialVersionUID = 1L;
+ @Nullable private final String originalExpression;
private final String expression;
private final List<String> argumentNames;
private final List<Class<?>> argumentClasses;
@@ -46,11 +50,13 @@ public class TransformExpressionKey implements Serializable
{
private final Map<String, String> columnNameMap;
private TransformExpressionKey(
+ @Nullable String originalExpression,
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
+ this.originalExpression = originalExpression;
this.expression = expression;
this.argumentNames = argumentNames;
this.argumentClasses = argumentClasses;
@@ -58,6 +64,11 @@ public class TransformExpressionKey implements Serializable {
this.columnNameMap = columnNameMap;
}
+ @Nullable
+ public String getOriginalExpression() {
+ return originalExpression;
+ }
+
public String getExpression() {
return expression;
}
@@ -79,13 +90,19 @@ public class TransformExpressionKey implements Serializable
{
}
public static TransformExpressionKey of(
+ @Nullable String originalExpression,
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
return new TransformExpressionKey(
- expression, argumentNames, argumentClasses, returnClass,
columnNameMap);
+ originalExpression,
+ expression,
+ argumentNames,
+ argumentClasses,
+ returnClass,
+ columnNameMap);
}
@Override
@@ -97,7 +114,8 @@ public class TransformExpressionKey implements Serializable {
return false;
}
TransformExpressionKey that = (TransformExpressionKey) o;
- return expression.equals(that.expression)
+ return Objects.equals(originalExpression, that.originalExpression)
+ && expression.equals(that.expression)
&& argumentNames.equals(that.argumentNames)
&& argumentClasses.equals(that.argumentClasses)
&& returnClass.equals(that.returnClass)
@@ -106,13 +124,22 @@ public class TransformExpressionKey implements
Serializable {
@Override
public int hashCode() {
- return Objects.hash(expression, argumentNames, argumentClasses,
returnClass, columnNameMap);
+ return Objects.hash(
+ originalExpression,
+ expression,
+ argumentNames,
+ argumentClasses,
+ returnClass,
+ columnNameMap);
}
@Override
public String toString() {
return "TransformExpressionKey{"
- + "expression='"
+ + "originalExpression='"
+ + originalExpression
+ + '\''
+ + ", expression='"
+ expression
+ '\''
+ ", argumentNames="
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 77b01759e..299266d1b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -112,10 +112,13 @@ public class TransformFilterProcessor {
} catch (InvocationTargetException e) {
throw new RuntimeException(
String.format(
- "Failed to evaluate filtering expression `%s` for
table `%s`.\n"
+ "Failed to evaluate filtering expression for table
`%s`.\n"
+ + "\tOriginal expression: %s\n"
+ + "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
- transformFilter.getScriptExpression(),
tableInfo.getName(),
+ transformFilter.getExpression(),
+ transformFilter.getScriptExpression(),
transformFilter.getColumnNameMapAsString()),
e);
}
@@ -206,6 +209,7 @@ public class TransformFilterProcessor {
args.f1.add(Long.class);
return TransformExpressionKey.of(
+ transformFilter.getExpression(),
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
args.f0,
args.f1,
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 20cae3a20..e50ed0ca7 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -2767,7 +2767,12 @@ class PostTransformOperatorTest {
+ "\tcolumns={`col1` STRING NOT NULL,`castInt`
INT,`castBoolean` BOOLEAN,`castTinyint` TINYINT,`castSmallint`
SMALLINT,`castBigint` BIGINT,`castFloat` FLOAT,`castDouble` DOUBLE,`castChar`
STRING,`castVarchar` STRING,`castDecimal` DECIMAL(4, 2),`castTimestamp`
TIMESTAMP(3)}, primaryKeys=col1, options=().")
.cause()
.hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Failed to evaluate projection expression
`CAST(`TB`.`castFloat` AS TIMESTAMP(3))` for column `castTimestamp` in table
`my_company.my_branch.data_cast`.\n"
+ + "\tCompiled expression: castToTimestamp($0,
__time_zone__)\n"
+ + "\tColumn name map: {$0 -> castFloat}")
.hasRootCauseMessage("Unable to parse given string as
timestamp: 1.0");
+
transformFunctionEventEventOperatorTestHarness.close();
}