This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.4
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.4 by this push:
     new eabb10972 [FLINK-37741][cdc-runtime] Fix transform operator 
performance degradation
eabb10972 is described below

commit eabb1097206b03c91e62a1fa0e71ccae81652257
Author: yuxiqian <[email protected]>
AuthorDate: Tue Apr 29 13:57:40 2025 +0800

    [FLINK-37741][cdc-runtime] Fix transform operator performance degradation
    
    This closes  #4011
---
 .../org/apache/flink/cdc/common/schema/Schema.java | 32 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index 61f7d36bc..2ac780ed9 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -61,6 +61,13 @@ public class Schema implements Serializable {
     // Used to index column by name
     private transient volatile Map<String, Column> nameToColumns;
 
+    // Transiently cached fields that are lazily calculated
+    private transient List<String> columnNames;
+
+    private transient List<DataType> columnDataTypes;
+
+    private transient DataType columnRowType;
+
     /**
      * Schema might be used as a LoadingCache key frequently, and maintaining 
a cache of hashCode
      * would be more efficient.
@@ -104,14 +111,24 @@ public class Schema implements Serializable {
 
     /** Returns all column names. It does not distinguish between different 
kinds of columns. */
     public List<String> getColumnNames() {
-        return 
columns.stream().map(Column::getName).collect(Collectors.toList());
+        if (columnNames == null) {
+            columnNames =
+                    Collections.unmodifiableList(
+                            
columns.stream().map(Column::getName).collect(Collectors.toList()));
+        }
+        return columnNames;
     }
 
     /**
      * Returns all column data types. It does not distinguish between 
different kinds of columns.
      */
     public List<DataType> getColumnDataTypes() {
-        return 
columns.stream().map(Column::getType).collect(Collectors.toList());
+        if (columnDataTypes == null) {
+            columnDataTypes =
+                    Collections.unmodifiableList(
+                            
columns.stream().map(Column::getType).collect(Collectors.toList()));
+        }
+        return columnDataTypes;
     }
 
     /** Returns the primary keys of the table or data collection. */
@@ -159,10 +176,13 @@ public class Schema implements Serializable {
      * @see DataTypes#ROW(DataField...)
      */
     public DataType toRowDataType() {
-        final DataField[] fields =
-                
columns.stream().map(Schema::columnToField).toArray(DataField[]::new);
-        // the row should never be null
-        return DataTypes.ROW(fields).notNull();
+        if (columnRowType == null) {
+            final DataField[] fields =
+                    
columns.stream().map(Schema::columnToField).toArray(DataField[]::new);
+            // the row should never be null
+            columnRowType = DataTypes.ROW(fields).notNull();
+        }
+        return columnRowType;
     }
 
     /** Returns a copy of the schema with a replaced list of {@link Column}. */

Reply via email to