This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 246e914e9a3451a7bc3d9578e0b2b43021ebe82a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 00:40:29 2025 +0200

    [hotfix] Introduce SinkUpsertMaterializer factory method
---
 .../runtime/operators/sink/SinkUpsertMaterializer.java   | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
index 8ba4b792e2b..cd5f66f7a8d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
@@ -31,6 +31,8 @@ import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
@@ -222,4 +224,18 @@ public class SinkUpsertMaterializer extends 
TableStreamOperator<RowData>
         }
         return equaliser.equals(newRow, oldRow);
     }
+
+    public static SinkUpsertMaterializer create(
+            StateTtlConfig ttlConfig,
+            RowType physicalRowType,
+            GeneratedRecordEqualiser rowEqualiser,
+            GeneratedRecordEqualiser upsertKeyEqualiser,
+            int[] inputUpsertKey) {
+        return new SinkUpsertMaterializer(
+                ttlConfig,
+                InternalSerializers.create(physicalRowType),
+                rowEqualiser,
+                upsertKeyEqualiser,
+                inputUpsertKey);
+    }
 }

Reply via email to