This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3cb91fb51 [FLINK-35805][transform] Add __data_event_type__ metadata
column
3cb91fb51 is described below
commit 3cb91fb51acd3228a0cb39fe514bc7c23a4883b5
Author: yuxiqian <[email protected]>
AuthorDate: Tue Aug 13 20:07:10 2024 +0800
[FLINK-35805][transform] Add __data_event_type__ metadata column
This closes #3468
---
docs/content.zh/docs/core-concept/transform.md | 9 ++--
docs/content/docs/core-concept/transform.md | 9 ++--
.../flink/FlinkPipelineComposerITCase.java | 60 ++++++++++++++++++++++
.../cdc/pipeline/tests/TransformE2eITCase.java | 30 +++++------
.../operators/transform/PostTransformOperator.java | 18 +++++--
.../transform/ProjectionColumnProcessor.java | 43 ++++++++--------
.../transform/TransformFilterProcessor.java | 36 +++++++------
.../transform/TransformProjectionProcessor.java | 4 +-
.../flink/cdc/runtime/parser/TransformParser.java | 15 ++----
.../runtime/parser/metadata/MetadataColumns.java | 40 +++++++++++++++
10 files changed, 186 insertions(+), 78 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index 62f8f210e..f3a11ea7c 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML
file.
There are some hidden columns used to access metadata information. They will
only take effect when explicitly referenced in the transform rules.
| Field | Data Type | Description
|
-|--------------------|-----------|----------------------------------------------|
-| __namespace_name__ | String | Name of the namespace that contains the
row. |
-| __schema_name__ | String | Name of the schema that contains the row.
|
-| __table_name__ | String | Name of the table that contains the row.
|
+|---------------------|-----------|----------------------------------------------|
+| __namespace_name__ | String | Name of the namespace that contains the
row. |
+| __schema_name__ | String | Name of the schema that contains the row.
|
+| __table_name__ | String | Name of the table that contains the row.
|
+| __data_event_type__ | String | Operation type of data change event.
|
## Metadata relationship
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 62f8f210e..f3a11ea7c 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML
file.
There are some hidden columns used to access metadata information. They will
only take effect when explicitly referenced in the transform rules.
| Field | Data Type | Description
|
-|--------------------|-----------|----------------------------------------------|
-| __namespace_name__ | String | Name of the namespace that contains the
row. |
-| __schema_name__ | String | Name of the schema that contains the row.
|
-| __table_name__ | String | Name of the table that contains the row.
|
+|---------------------|-----------|----------------------------------------------|
+| __namespace_name__ | String | Name of the namespace that contains the
row. |
+| __schema_name__ | String | Name of the schema that contains the row.
|
+| __table_name__ | String | Name of the table that contains the row.
|
+| __data_event_type__ | String | Operation type of data change event.
|
## Metadata relationship
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index eafb4035e..45cdbe9e8 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -340,6 +340,66 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ,
20], after=[2, x, 20], op=UPDATE, meta=()}");
}
+ @ParameterizedTest
+ @EnumSource
+ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup transform
+ TransformDef transformDef =
+ new TransformDef(
+ "default_namespace.default_schema.table1",
+ "*,concat(col1,'0') as col12,__data_event_type__ as
rk",
+ "col1 <> '3'",
+ "col1",
+ "col12",
+ "key1=value1",
+ "");
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ new
ArrayList<>(Collections.singletonList(transformDef)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING},
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[1, 1, 10, +I], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, 20, +I], op=INSERT, meta=()}",
+
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST,
existedColumnName=null}]}",
+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1,
nameMapping={col2=newCol2, col3=newCol3}}",
+
"DropColumnEvent{tableId=default_namespace.default_schema.table1,
droppedColumnNames=[newCol2]}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1,
10, -D], after=[], op=DELETE, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ,
20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
+ }
+
@ParameterizedTest
@EnumSource
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 0dad9f363..e27977153 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -442,9 +442,9 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
- + " projection: \\*, __namespace_name__ ||
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ + " projection: \\*, __namespace_name__ ||
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name,
__data_event_type__ AS type\n"
+ " - source-table: %s.TABLEBETA\n"
- + " projection: \\*, __namespace_name__ ||
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ + " projection: \\*, __namespace_name__ ||
'.' || __schema_name__ || '.' || __table_name__ AS identifier_name,
__data_event_type__ AS type\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
@@ -462,25 +462,25 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
waitUntilSpecificEvent(
String.format(
- "CreateTableEvent{tableId=%s.TABLEALPHA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA`
INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING},
primaryKeys=ID, options=()}",
+ "CreateTableEvent{tableId=%s.TABLEALPHA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA`
INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type`
STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);
waitUntilSpecificEvent(
String.format(
- "CreateTableEvent{tableId=%s.TABLEBETA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA`
VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING},
primaryKeys=ID, options=()}",
+ "CreateTableEvent{tableId=%s.TABLEBETA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA`
VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name`
STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);
validateEvents(
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012,
12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011,
11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014,
14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013,
13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}");
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012,
12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011,
11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014,
14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013,
13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}");
// generate binlogs
String mysqlJdbcUrl =
@@ -492,9 +492,9 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
insertBinlogEvents(mysqlJdbcUrl);
validateEvents(
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0,
18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob,
null.%s.TABLEALPHA], op=UPDATE, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big
Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}");
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0,
18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob,
null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEALPHA, before=[],
after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big
Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}");
}
private static void insertBinlogEvents(String mysqlJdbcUrl) throws
SQLException {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index a4f938d15..36955f8e1 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -23,6 +23,7 @@ import
org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -389,13 +390,15 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
// insert and update event only process afterData, delete only process
beforeData
if (after != null) {
- if (transformFilterProcessor.process(after, epochTime)) {
+ if (transformFilterProcessor.process(
+ after, epochTime, opTypeToRowKind(dataChangeEvent.op(),
'+'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
}
} else if (before != null) {
- if (transformFilterProcessor.process(before, epochTime)) {
+ if (transformFilterProcessor.process(
+ before, epochTime, opTypeToRowKind(dataChangeEvent.op(),
'-'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
@@ -412,11 +415,14 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
- postTransformProcessor.processData(before, epochTime);
+ postTransformProcessor.processData(
+ before, epochTime,
opTypeToRowKind(dataChangeEvent.op(), '-'));
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent,
projectedBefore);
}
if (after != null) {
- BinaryRecordData projectedAfter =
postTransformProcessor.processData(after, epochTime);
+ BinaryRecordData projectedAfter =
+ postTransformProcessor.processData(
+ after, epochTime,
opTypeToRowKind(dataChangeEvent.op(), '+'));
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent,
projectedAfter);
}
return Optional.of(dataChangeEvent);
@@ -499,4 +505,8 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
}
});
}
+
+ private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
+ return String.format("%c%c", beforeOrAfter, opType.name().charAt(0));
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index cbe290dcb..ee7740d9e 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -21,7 +21,7 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
-import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.codehaus.janino.ExpressionEvaluator;
@@ -33,6 +33,8 @@ import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
+import static
org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
+
/**
* The processor of the projection column. It processes the data column and
the user-defined
* computed columns.
@@ -79,9 +81,9 @@ public class ProjectionColumnProcessor {
return projectionColumn;
}
- public Object evaluate(BinaryRecordData after, long epochTime) {
+ public Object evaluate(BinaryRecordData record, long epochTime, String
opType) {
try {
- return expressionEvaluator.evaluate(generateParams(after,
epochTime));
+ return expressionEvaluator.evaluate(generateParams(record,
epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} column:{} projection:{} execute failed. {}",
@@ -93,7 +95,7 @@ public class ProjectionColumnProcessor {
}
}
- private Object[] generateParams(BinaryRecordData after, long epochTime) {
+ private Object[] generateParams(BinaryRecordData record, long epochTime,
String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
@@ -103,15 +105,18 @@ public class ProjectionColumnProcessor {
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
- case TransformParser.DEFAULT_NAMESPACE_NAME:
+ case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
- case TransformParser.DEFAULT_SCHEMA_NAME:
+ case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
- case TransformParser.DEFAULT_TABLE_NAME:
+ case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
+ case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
+ params.add(opType);
+ continue;
}
boolean argumentFound = false;
@@ -120,7 +125,7 @@ public class ProjectionColumnProcessor {
if (column.getName().equals(originalColumnName)) {
params.add(
DataTypeConverter.convertToOriginal(
- fieldGetters[i].getFieldOrNull(after),
column.getType()));
+ fieldGetters[i].getFieldOrNull(record),
column.getType()));
argumentFound = true;
break;
}
@@ -158,20 +163,14 @@ public class ProjectionColumnProcessor {
}
for (String originalColumnName : originalColumnNames) {
- switch (originalColumnName) {
- case TransformParser.DEFAULT_NAMESPACE_NAME:
- argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
- paramTypes.add(String.class);
- break;
- case TransformParser.DEFAULT_SCHEMA_NAME:
- argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
- paramTypes.add(String.class);
- break;
- case TransformParser.DEFAULT_TABLE_NAME:
- argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
- paramTypes.add(String.class);
- break;
- }
+ METADATA_COLUMNS.stream()
+ .filter(col -> col.f0.equals(originalColumnName))
+ .findFirst()
+ .ifPresent(
+ col -> {
+ argumentNames.add(col.f0);
+ paramTypes.add(col.f2);
+ });
}
argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index d1f67818b..430813d7f 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
+import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.codehaus.janino.ExpressionEvaluator;
@@ -32,11 +33,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.stream.Stream;
-import static
org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_NAMESPACE_NAME;
-import static
org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_SCHEMA_NAME;
-import static
org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_TABLE_NAME;
+import static
org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
/** The processor of the transform filter. It processes the data change event
of matched table. */
public class TransformFilterProcessor {
@@ -74,9 +72,10 @@ public class TransformFilterProcessor {
tableInfo, transformFilter, timezone, udfDescriptors,
udfFunctionInstances);
}
- public boolean process(BinaryRecordData after, long epochTime) {
+ public boolean process(BinaryRecordData record, long epochTime, String
opType) {
try {
- return (Boolean)
expressionEvaluator.evaluate(generateParams(after, epochTime));
+ return (Boolean)
+ expressionEvaluator.evaluate(generateParams(record,
epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} filter:{} execute failed. {}",
@@ -102,19 +101,19 @@ public class TransformFilterProcessor {
}
}
}
- Stream.of(DEFAULT_NAMESPACE_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_TABLE_NAME)
+
+ METADATA_COLUMNS.stream()
.forEach(
- metadataColumn -> {
- if (scriptExpression.contains(metadataColumn)
- && !argNames.contains(metadataColumn)) {
- argNames.add(metadataColumn);
- argTypes.add(String.class);
+ col -> {
+ if (scriptExpression.contains(col.f0) &&
!argNames.contains(col.f0)) {
+ argNames.add(col.f0);
+ argTypes.add(col.f2);
}
});
return Tuple2.of(argNames, argTypes);
}
- private Object[] generateParams(BinaryRecordData after, long epochTime) {
+ private Object[] generateParams(BinaryRecordData record, long epochTime,
String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
@@ -123,22 +122,25 @@ public class TransformFilterProcessor {
RecordData.FieldGetter[] fieldGetters =
tableInfo.getPreTransformedFieldGetters();
for (String columnName : args.f0) {
switch (columnName) {
- case DEFAULT_NAMESPACE_NAME:
+ case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
- case DEFAULT_SCHEMA_NAME:
+ case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
- case DEFAULT_TABLE_NAME:
+ case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
+ case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
+ params.add(opType);
+ continue;
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if (column.getName().equals(columnName)) {
params.add(
DataTypeConverter.convertToOriginal(
- fieldGetters[i].getFieldOrNull(after),
column.getType()));
+ fieldGetters[i].getFieldOrNull(record),
column.getType()));
break;
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
index 452ac2238..45ea35770 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
@@ -113,7 +113,7 @@ public class TransformProjectionProcessor {
.collect(Collectors.toList()));
}
- public BinaryRecordData processData(BinaryRecordData payload, long
epochTime) {
+ public BinaryRecordData processData(BinaryRecordData payload, long
epochTime, String opType) {
List<Object> valueList = new ArrayList<>();
List<Column> columns =
postTransformChangeInfo.getPostTransformedSchema().getColumns();
@@ -124,7 +124,7 @@ public class TransformProjectionProcessor {
ProjectionColumn projectionColumn =
projectionColumnProcessor.getProjectionColumn();
valueList.add(
DataTypeConverter.convert(
- projectionColumnProcessor.evaluate(payload,
epochTime),
+ projectionColumnProcessor.evaluate(payload,
epochTime, opType),
projectionColumn.getDataType()));
} else {
Column column = columns.get(i);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index ea7bf4c5c..c7a1b718e 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -20,7 +20,6 @@ package org.apache.flink.cdc.runtime.parser;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory;
@@ -84,6 +83,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.flink.cdc.common.utils.StringUtils.isNullOrWhitespaceOnly;
+import static
org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;
import static
org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.convertCalciteType;
/** Use Flink's calcite parser to parse the statement of flink cdc pipeline
transform. */
@@ -91,9 +91,6 @@ public class TransformParser {
private static final Logger LOG =
LoggerFactory.getLogger(TransformParser.class);
private static final String DEFAULT_SCHEMA = "default_schema";
private static final String DEFAULT_TABLE = "TB";
- public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__";
- public static final String DEFAULT_SCHEMA_NAME = "__schema_name__";
- public static final String DEFAULT_TABLE_NAME = "__table_name__";
private static SqlParser getCalciteParser(String sql) {
return SqlParser.create(
@@ -497,16 +494,14 @@ public class TransformParser {
private static List<Column> copyFillMetadataColumn(List<Column> columns) {
// Add metaColumn for SQLValidator.validate
List<Column> columnsWithMetadata = new ArrayList<>(columns);
- columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME,
DataTypes.STRING()));
- columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME,
DataTypes.STRING()));
- columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME,
DataTypes.STRING()));
+ METADATA_COLUMNS.stream()
+ .map(col -> Column.physicalColumn(col.f0, col.f1))
+ .forEach(columnsWithMetadata::add);
return columnsWithMetadata;
}
private static boolean isMetadataColumn(String columnName) {
- return DEFAULT_TABLE_NAME.equals(columnName)
- || DEFAULT_SCHEMA_NAME.equals(columnName)
- || DEFAULT_NAMESPACE_NAME.equals(columnName);
+ return METADATA_COLUMNS.stream().anyMatch(col ->
col.f0.equals(columnName));
}
public static SqlSelect parseFilterExpression(String filterExpression) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java
new file mode 100644
index 000000000..f70e012f1
--- /dev/null
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.parser.metadata;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Contains all supported metadata columns that could be used in transform
expressions. */
+public class MetadataColumns {
+ public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__";
+ public static final String DEFAULT_SCHEMA_NAME = "__schema_name__";
+ public static final String DEFAULT_TABLE_NAME = "__table_name__";
+ public static final String DEFAULT_DATA_EVENT_TYPE = "__data_event_type__";
+
+ public static final List<Tuple3<String, DataType, Class<?>>>
METADATA_COLUMNS =
+ Arrays.asList(
+ Tuple3.of(DEFAULT_NAMESPACE_NAME, DataTypes.STRING(),
String.class),
+ Tuple3.of(DEFAULT_SCHEMA_NAME, DataTypes.STRING(),
String.class),
+ Tuple3.of(DEFAULT_TABLE_NAME, DataTypes.STRING(),
String.class),
+ Tuple3.of(DEFAULT_DATA_EVENT_TYPE, DataTypes.STRING(),
String.class));
+}