This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-3.4
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.4 by this push:
new eabb10972 [FLINK-37741][cdc-runtime] Fix transform operator
performance degradation
eabb10972 is described below
commit eabb1097206b03c91e62a1fa0e71ccae81652257
Author: yuxiqian <[email protected]>
AuthorDate: Tue Apr 29 13:57:40 2025 +0800
[FLINK-37741][cdc-runtime] Fix transform operator performance degradation
This closes #4011
---
.../org/apache/flink/cdc/common/schema/Schema.java | 32 ++++++++++++++++++----
1 file changed, 26 insertions(+), 6 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index 61f7d36bc..2ac780ed9 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -61,6 +61,13 @@ public class Schema implements Serializable {
// Used to index column by name
private transient volatile Map<String, Column> nameToColumns;
+ // Transiently cached fields that are lazily calculated
+ private transient List<String> columnNames;
+
+ private transient List<DataType> columnDataTypes;
+
+ private transient DataType columnRowType;
+
/**
* Schema might be used as a LoadingCache key frequently, and maintaining
a cache of hashCode
* would be more efficient.
@@ -104,14 +111,24 @@ public class Schema implements Serializable {
/** Returns all column names. It does not distinguish between different
kinds of columns. */
public List<String> getColumnNames() {
- return
columns.stream().map(Column::getName).collect(Collectors.toList());
+ if (columnNames == null) {
+ columnNames =
+ Collections.unmodifiableList(
+
columns.stream().map(Column::getName).collect(Collectors.toList()));
+ }
+ return columnNames;
}
/**
* Returns all column data types. It does not distinguish between
different kinds of columns.
*/
public List<DataType> getColumnDataTypes() {
- return
columns.stream().map(Column::getType).collect(Collectors.toList());
+ if (columnDataTypes == null) {
+ columnDataTypes =
+ Collections.unmodifiableList(
+
columns.stream().map(Column::getType).collect(Collectors.toList()));
+ }
+ return columnDataTypes;
}
/** Returns the primary keys of the table or data collection. */
@@ -159,10 +176,13 @@ public class Schema implements Serializable {
* @see DataTypes#ROW(DataField...)
*/
public DataType toRowDataType() {
- final DataField[] fields =
-
columns.stream().map(Schema::columnToField).toArray(DataField[]::new);
- // the row should never be null
- return DataTypes.ROW(fields).notNull();
+ if (columnRowType == null) {
+ final DataField[] fields =
+
columns.stream().map(Schema::columnToField).toArray(DataField[]::new);
+ // the row should never be null
+ columnRowType = DataTypes.ROW(fields).notNull();
+ }
+ return columnRowType;
}
/** Returns a copy of the schema with a replaced list of {@link Column}. */