This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 999258436 [FLINK-37011][cdc-transform] Improve get source field value 
by column name in PreTransformProcessor
999258436 is described below

commit 999258436595759556daecddd9c158899b3c60e8
Author: moses <72908278+chaomingzhan...@users.noreply.github.com>
AuthorDate: Tue Jan 14 19:19:06 2025 +0800

    [FLINK-37011][cdc-transform] Improve get source field value by column name 
in PreTransformProcessor
    
    This closes #3836
---
 .../org/apache/flink/cdc/common/schema/Schema.java |  2 +-
 .../transform/PreTransformChangeInfo.java          | 24 ++++++++++-------
 .../operators/transform/PreTransformProcessor.java | 30 +++++-----------------
 3 files changed, 23 insertions(+), 33 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index 01ca8906b..e452ba547 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -164,7 +164,7 @@ public class Schema implements Serializable {
         return DataTypes.ROW(fields).notNull();
     }
 
-    /** Returns a copy of the schema with a replaced list of {@Column}. */
+    /** Returns a copy of the schema with a replaced list of {@link Column}. */
     public Schema copy(List<Column> columns) {
         return new Schema(
                 columns,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java
index 850eb4cc4..f1bdfde08 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java
@@ -34,18 +34,20 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * PreTransformChangeInfo caches source / pre-transformed schema, source 
schema field getters, and
  * binary record data generator for pre-transform schema.
  */
 public class PreTransformChangeInfo {
-    private TableId tableId;
-    private Schema sourceSchema;
-    private Schema preTransformedSchema;
-    private RecordData.FieldGetter[] sourceFieldGetters;
-    private BinaryRecordDataGenerator preTransformedRecordDataGenerator;
+    private final TableId tableId;
+    private final Schema sourceSchema;
+    private final Schema preTransformedSchema;
+    private final Map<String, RecordData.FieldGetter> sourceFieldGettersMap;
+    private final BinaryRecordDataGenerator preTransformedRecordDataGenerator;
 
     public static final PreTransformChangeInfo.Serializer SERIALIZER =
             new PreTransformChangeInfo.Serializer();
@@ -54,12 +56,16 @@ public class PreTransformChangeInfo {
             TableId tableId,
             Schema sourceSchema,
             Schema preTransformedSchema,
-            RecordData.FieldGetter[] sourceFieldGetters,
+            RecordData.FieldGetter[] sourceFieldGettersMap,
             BinaryRecordDataGenerator preTransformedRecordDataGenerator) {
         this.tableId = tableId;
         this.sourceSchema = sourceSchema;
         this.preTransformedSchema = preTransformedSchema;
-        this.sourceFieldGetters = sourceFieldGetters;
+        this.sourceFieldGettersMap = new 
HashMap<>(sourceSchema.getColumnCount());
+        for (int i = 0; i < sourceSchema.getColumns().size(); i++) {
+            this.sourceFieldGettersMap.put(
+                    sourceSchema.getColumns().get(i).getName(), 
sourceFieldGettersMap[i]);
+        }
         this.preTransformedRecordDataGenerator = 
preTransformedRecordDataGenerator;
     }
 
@@ -87,8 +93,8 @@ public class PreTransformChangeInfo {
         return preTransformedSchema;
     }
 
-    public RecordData.FieldGetter[] getSourceFieldGetters() {
-        return sourceFieldGetters;
+    public Map<String, RecordData.FieldGetter> getSourceFieldGettersMap() {
+        return sourceFieldGettersMap;
     }
 
     public BinaryRecordDataGenerator getPreTransformedRecordDataGenerator() {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
index d4f6fec7d..cd679fcd5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
@@ -22,10 +22,10 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * The processor of pre-transform projection in {@link PreTransformOperator}.
@@ -39,7 +39,7 @@ import java.util.List;
  * </ul>
  */
 public class PreTransformProcessor {
-    private PreTransformChangeInfo tableChangeInfo;
+    private final PreTransformChangeInfo tableChangeInfo;
 
     public PreTransformProcessor(PreTransformChangeInfo tableChangeInfo) {
         this.tableChangeInfo = tableChangeInfo;
@@ -62,30 +62,14 @@ public class PreTransformProcessor {
     public BinaryRecordData processFillDataField(BinaryRecordData data) {
         List<Object> valueList = new ArrayList<>();
         List<Column> columns = 
tableChangeInfo.getPreTransformedSchema().getColumns();
-        for (int i = 0; i < columns.size(); i++) {
-            valueList.add(
-                    getValueFromBinaryRecordData(
-                            columns.get(i).getName(),
-                            data,
-                            tableChangeInfo.getSourceSchema().getColumns(),
-                            tableChangeInfo.getSourceFieldGetters()));
+        Map<String, RecordData.FieldGetter> sourceFieldGettersMap =
+                tableChangeInfo.getSourceFieldGettersMap();
+        for (Column column : columns) {
+            RecordData.FieldGetter fieldGetter = 
sourceFieldGettersMap.get(column.getName());
+            valueList.add(fieldGetter.getFieldOrNull(data));
         }
         return tableChangeInfo
                 .getPreTransformedRecordDataGenerator()
                 .generate(valueList.toArray(new Object[0]));
     }
-
-    private Object getValueFromBinaryRecordData(
-            String columnName,
-            BinaryRecordData binaryRecordData,
-            List<Column> columns,
-            RecordData.FieldGetter[] fieldGetters) {
-        for (int i = 0; i < columns.size(); i++) {
-            if (columnName.equals(columns.get(i).getName())) {
-                return DataTypeConverter.convert(
-                        fieldGetters[i].getFieldOrNull(binaryRecordData), 
columns.get(i).getType());
-            }
-        }
-        return null;
-    }
 }

Reply via email to