This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 17d21a957 [FLINK-35984][cdc-runtime] Fix bug that metadata column name
can not be used in transform rule
17d21a957 is described below
commit 17d21a957a4e73648788c5abe493bf7be6b63a5b
Author: MOBIN <[email protected]>
AuthorDate: Mon Aug 12 14:34:10 2024 +0800
[FLINK-35984][cdc-runtime] Fix bug that metadata column name can not be
used in transform rule
This closes #3528.
Co-authored-by: yuxiqian <[email protected]>
---
.../transform/ProjectionColumnProcessor.java | 30 ++++++-------
.../flink/cdc/runtime/parser/TransformParser.java | 26 +++--------
.../transform/UnifiedTransformOperatorTest.java | 51 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 35 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index a27af2370..cbe290dcb 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -156,22 +156,22 @@ public class ProjectionColumnProcessor {
}
}
}
- if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME)
- &&
!argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
- paramTypes.add(String.class);
- }
-
- if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME)
- &&
!argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
- paramTypes.add(String.class);
- }
- if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME)
- &&
!argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) {
- argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
- paramTypes.add(String.class);
+ for (String originalColumnName : originalColumnNames) {
+ switch (originalColumnName) {
+ case TransformParser.DEFAULT_NAMESPACE_NAME:
+ argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
+ paramTypes.add(String.class);
+ break;
+ case TransformParser.DEFAULT_SCHEMA_NAME:
+ argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
+ paramTypes.add(String.class);
+ break;
+ case TransformParser.DEFAULT_TABLE_NAME:
+ argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
+ paramTypes.add(String.class);
+ break;
+ }
}
argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 6260bf7b0..ae598b7da 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -109,7 +109,7 @@ public class TransformParser {
List<Column> columns,
SqlNode sqlNode,
List<UserDefinedFunctionDescriptor> udfDescriptors) {
- List<Column> columnsWithMetadata =
copyFillMetadataColumn(sqlNode.toString(), columns);
+ List<Column> columnsWithMetadata = copyFillMetadataColumn(columns);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
SchemaPlus schema = rootSchema.plus();
Map<String, Object> operand = new HashMap<>();
@@ -498,29 +498,15 @@ public class TransformParser {
return parseSelect(statement.toString());
}
- private static List<Column> copyFillMetadataColumn(
- String transformStatement, List<Column> columns) {
+ private static List<Column> copyFillMetadataColumn(List<Column> columns) {
+ // Add metaColumn for SQLValidator.validate
List<Column> columnsWithMetadata = new ArrayList<>(columns);
- if (transformStatement.contains(DEFAULT_NAMESPACE_NAME)
- && !containsMetadataColumn(columnsWithMetadata,
DEFAULT_NAMESPACE_NAME)) {
- columnsWithMetadata.add(
- Column.physicalColumn(DEFAULT_NAMESPACE_NAME,
DataTypes.STRING()));
- }
- if (transformStatement.contains(DEFAULT_SCHEMA_NAME)
- && !containsMetadataColumn(columnsWithMetadata,
DEFAULT_SCHEMA_NAME)) {
- columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME,
DataTypes.STRING()));
- }
- if (transformStatement.contains(DEFAULT_TABLE_NAME)
- && !containsMetadataColumn(columnsWithMetadata,
DEFAULT_TABLE_NAME)) {
- columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME,
DataTypes.STRING()));
- }
+ columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME,
DataTypes.STRING()));
+ columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME,
DataTypes.STRING()));
+ columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME,
DataTypes.STRING()));
return columnsWithMetadata;
}
- private static boolean containsMetadataColumn(List<Column> columns, String
columnName) {
- return columns.stream().anyMatch(column ->
column.getName().equals(columnName));
- }
-
private static boolean isMetadataColumn(String columnName) {
return DEFAULT_TABLE_NAME.equals(columnName)
|| DEFAULT_SCHEMA_NAME.equals(columnName)
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index 009d700fe..43a364e51 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -917,6 +917,57 @@ public class UnifiedTransformOperatorTest {
.destroyHarness();
}
+ @Test
+ public void testMetadataTransformIncludeMetaColumnString() throws
Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch",
"schema_nullability");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "id, name, age, id + age as computed,
__namespace_name__ as metaColNameSpaceName, __schema_name__ as
metaColSchemaName, __table_name__ as metaColNameTableName, "
+ + "UPPER(__schema_name__) as
metaColSchemaNameUpper, '__table_name__' as metaColStr1,
'__namespace__name__schema__name__table__name__' as metaColStr2",
+ "id > 100",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name",
DataTypes.STRING().notNull())
+ .physicalColumn("age",
DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name",
DataTypes.STRING().notNull())
+ .physicalColumn("age",
DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name",
DataTypes.STRING().notNull())
+ .physicalColumn("age",
DataTypes.INT().notNull())
+ .physicalColumn("computed", DataTypes.INT())
+ .physicalColumn("metaColNameSpaceName",
DataTypes.STRING())
+ .physicalColumn("metaColSchemaName",
DataTypes.STRING())
+ .physicalColumn("metaColNameTableName",
DataTypes.STRING())
+ .physicalColumn("metaColSchemaNameUpper",
DataTypes.STRING())
+ .physicalColumn("metaColStr1",
DataTypes.STRING())
+ .physicalColumn("metaColStr2",
DataTypes.STRING())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .insertSource(1000, "Alice", 17)
+ .insertPreTransformed(1000, "Alice", 17)
+ .insertPostTransformed(
+ 1000,
+ "Alice",
+ 17,
+ 1017,
+ "my_company",
+ "my_branch",
+ "schema_nullability",
+ "MY_BRANCH",
+ "__table_name__",
+ "__namespace__name__schema__name__table__name__")
+ .runTests()
+ .destroyHarness();
+ }
+
@Test
public void testTransformWithCast() throws Exception {
TableId tableId = TableId.tableId("my_company", "my_branch",
"transform_with_cast");