This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 84ef9d5da [FLINK-35981][cdc-runtime] Transform supports referencing
one column more than once
84ef9d5da is described below
commit 84ef9d5daa78412b3f2f4dc087e8ea2e4be93b7c
Author: MOBIN <[email protected]>
AuthorDate: Sat Aug 10 00:01:36 2024 +0800
[FLINK-35981][cdc-runtime] Transform supports referencing one column more
than once
This closes #3515.
---
.../transform/ProjectionColumnProcessor.java | 8 ++-
.../transform/TransformFilterProcessor.java | 9 ++-
.../transform/PostTransformOperatorTest.java | 71 ++++++++++++++++++++++
3 files changed, 81 insertions(+), 7 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 3dde9b20c..a27af2370 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
/**
@@ -98,7 +99,9 @@ public class ProjectionColumnProcessor {
// 1 - Add referenced columns
RecordData.FieldGetter[] fieldGetters =
tableInfo.getPreTransformedFieldGetters();
- for (String originalColumnName :
projectionColumn.getOriginalColumnNames()) {
+ LinkedHashSet<String> originalColumnNames =
+ new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
+ for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
@@ -142,7 +145,8 @@ public class ProjectionColumnProcessor {
List<Class<?>> paramTypes = new ArrayList<>();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
String scriptExpression = projectionColumn.getScriptExpression();
- List<String> originalColumnNames =
projectionColumn.getOriginalColumnNames();
+ LinkedHashSet<String> originalColumnNames =
+ new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
for (Column column : columns) {
if (column.getName().equals(originalColumnName)) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 84d483035..d1f67818b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Stream;
@@ -91,14 +92,12 @@ public class TransformFilterProcessor {
List<Class<?>> argTypes = new ArrayList<>();
String scriptExpression = transformFilter.getScriptExpression();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
- List<String> columnNames = transformFilter.getColumnNames();
+ LinkedHashSet<String> columnNames = new
LinkedHashSet<>(transformFilter.getColumnNames());
for (String columnName : columnNames) {
for (Column column : columns) {
if (column.getName().equals(columnName)) {
- if (!argNames.contains(columnName)) {
- argNames.add(columnName);
-
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
- }
+ argNames.add(columnName);
+
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
break;
}
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 34b710374..067842c31 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -226,6 +226,16 @@ public class PostTransformOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2",
"value2"))
.build();
+ private static final TableId COLUMN_SQUARE_TABLE =
+ TableId.tableId("my_company", "my_branch", "column_square");
+ private static final Schema COLUMN_SQUARE_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.INT())
+ .physicalColumn("col2", DataTypes.INT())
+ .physicalColumn("square_col2", DataTypes.INT())
+ .primaryKey("col1")
+ .build();
+
@Test
void testDataChangeEventTransform() throws Exception {
PostTransformOperator transform =
@@ -560,6 +570,67 @@ public class PostTransformOperatorTest {
.isEqualTo(new StreamRecord<>(insertEventExpect));
}
+ @Test
+ void testDataChangeEventTransformWithDuplicateColumns() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ COLUMN_SQUARE_TABLE.identifier(),
+ "col1, col2, col2 * col2 as square_col2",
+ "col2 < 3 OR col2 > 5")
+ .build();
+ EventOperatorTestHarness<PostTransformOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
COLUMN_SQUARE_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {1, 1,
null}));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new
Object[] {1, 1, 1}));
+
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {6, 6,
null}));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new
Object[] {6, 6, 36}));
+
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {4, 4,
null}));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
COLUMN_SQUARE_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isNull();
+ }
+
@Test
void testTimestampTransform() throws Exception {
PostTransformOperator transform =