This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 246e914e9a3451a7bc3d9578e0b2b43021ebe82a Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Sep 30 00:40:29 2025 +0200 [hotfix] Introduce SinkUpsertMaterializer factory method --- .../runtime/operators/sink/SinkUpsertMaterializer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java index 8ba4b792e2b..cd5f66f7a8d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java @@ -31,6 +31,8 @@ import org.apache.flink.table.data.utils.ProjectedRowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; @@ -222,4 +224,18 @@ public class SinkUpsertMaterializer extends TableStreamOperator<RowData> } return equaliser.equals(newRow, oldRow); } + + public static SinkUpsertMaterializer create( + StateTtlConfig ttlConfig, + RowType physicalRowType, + GeneratedRecordEqualiser rowEqualiser, + GeneratedRecordEqualiser upsertKeyEqualiser, + int[] inputUpsertKey) { + return new SinkUpsertMaterializer( + ttlConfig, + InternalSerializers.create(physicalRowType), + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); + } }
