This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a7c0f1bcdc92d550d38a85fee1900572e6aaac5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 16:56:53 2025 +0200

    [FLINK-38461] Introduce SinkUpsertMaterializerV2
---
 .../generated/execution_config_configuration.html  |  18 +++
 .../table/api/config/ExecutionConfigOptions.java   | 105 +++++++++++++
 .../flink/table/data/utils/ProjectedRowData.java   |   2 +
 .../plan/nodes/exec/stream/StreamExecSink.java     | 129 +++++++++++++--
 .../nodes/physical/stream/StreamPhysicalSink.scala |  12 +-
 .../table/api/internal/StatementSetImplTest.java   |  12 +-
 .../table/planner/utils/JsonPlanTestBase.java      |  11 +-
 .../operators/sink/SinkUpsertMaterializerV2.java   | 175 +++++++++++++++++++++
 .../SequencedMultiSetStateConfig.java              |  24 +--
 .../SequencedMultiSetStateContext.java             |  12 +-
 .../sequencedmultisetstate/TimeSelector.java       |   4 +-
 .../sink/SinkUpsertMaterializerMigrationTest.java  |  88 ++++++++---
 .../sink/SinkUpsertMaterializerRescalingTest.java  | 124 ++++++++++++---
 .../operators/sink/SinkUpsertMaterializerTest.java | 120 ++++++++++++--
 .../sink/SinkUpsertMaterializerVersion.java}       |  37 +++--
 .../SequencedMultiSetStateTest.java                |  16 +-
 .../migration-flink-2.2-HEAP-V2-snapshot           | Bin 0 -> 2607 bytes
 .../migration-flink-2.2-ROCKSDB-V2-snapshot        | Bin 0 -> 16108 bytes
 flink-test-utils-parent/flink-test-utils/pom.xml   |   7 +
 .../streaming/util/TestStreamEnvironment.java      |   9 ++
 20 files changed, 796 insertions(+), 109 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index e834d344293..ae6ce2126b4 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -272,6 +272,24 @@ By default no operator is disabled.</td>
             <td><p>Enum</p></td>
             <td>Because of the disorder of ChangeLog data caused by Shuffle in 
distributed system, the data received by Sink may not be the order of global 
upsert. So add upsert materialize operator before upsert sink. It receives the 
upstream changelog records and generate an upsert view for the downstream.<br 
/>By default, the materialize operator will be added when a distributed 
disorder occurs on unique keys. You can also choose no materialization(NONE) or 
force materialization(FORCE [...]
         </tr>
+        <tr>
+            
<td><h5>table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high</h5><br>
 <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>When using strategy=ADAPTIVE, defines the number of entries 
per key when the implementation is changed from VALUE to MAP. If not specified, 
Flink uses state-backend specific defaults (400 for hashmap state backend and 
50 for RocksDB and the rest).<br /></td>
+        </tr>
+        <tr>
+            
<td><h5>table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low</h5><br>
 <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>When using strategy=ADAPTIVE, defines the number of entries 
per key when the implementation is changed from MAP to VALUE. If not specified, 
Flink uses state-backend specific defaults (300 for hashmap state backend and 
40 for RocksDB and the rest).<br /></td>
+        </tr>
+        <tr>
+            <td><h5>table.exec.sink.upsert-materialize-strategy.type</h5><br> 
<span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">LEGACY</td>
+            <td><p>Enum</p></td>
+            <td>Which strategy of SinkUpsertMaterializer to use. Supported 
strategies:<br />LEGACY: Simple implementation based on ValueState&lt;List&gt; 
(the original implementation).<br />MAP: SequencedMultiSetState implementation 
based on a combination of several MapState maintaining ordering and fast lookup 
properties.<br />VALUE: Similar to LEGACY, but compatible with MAP and 
therefore allows to switch to ADAPTIVE.<br />ADAPTIVE: Alternate between MAP 
and VALUE depending on the numb [...]
+        </tr>
         <tr>
             <td><h5>table.exec.sort.async-merge-enabled</h5><br> <span 
class="label label-primary">Batch</span></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 790c41e5709..e6fd9d9440a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -159,6 +159,74 @@ public class ExecutionConfigOptions {
                                                     + "or force 
materialization(FORCE).")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using strategy="
+                                                            + 
SinkUpsertMaterializeStrategy.ADAPTIVE
+                                                            + ", defines the 
number of entries per key when the implementation is changed from "
+                                                            + 
SinkUpsertMaterializeStrategy.MAP
+                                                            + " to "
+                                                            + 
SinkUpsertMaterializeStrategy.VALUE
+                                                            + ". "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (300 for hashmap state 
backend and 40 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using strategy="
+                                                            + 
SinkUpsertMaterializeStrategy.ADAPTIVE
+                                                            + ", defines the 
number of entries per key when the implementation is changed from "
+                                                            + 
SinkUpsertMaterializeStrategy.VALUE
+                                                            + " to "
+                                                            + 
SinkUpsertMaterializeStrategy.MAP
+                                                            + ". "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (400 for hashmap state 
backend and 50 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<SinkUpsertMaterializeStrategy>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+                    key("table.exec.sink.upsert-materialize-strategy.type")
+                            .enumType(SinkUpsertMaterializeStrategy.class)
+                            .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "Which strategy of 
SinkUpsertMaterializer to use. Supported strategies:")
+                                            .linebreak()
+                                            .text(
+                                                    
SinkUpsertMaterializeStrategy.LEGACY
+                                                            + ": Simple 
implementation based on ValueState<List> (the original implementation).")
+                                            .linebreak()
+                                            .text(
+                                                    
SinkUpsertMaterializeStrategy.MAP
+                                                            + ": 
SequencedMultiSetState implementation based on a combination of several 
MapState maintaining ordering and fast lookup properties.")
+                                            .linebreak()
+                                            .text(
+                                                    
SinkUpsertMaterializeStrategy.VALUE
+                                                            + ": Similar to 
LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.")
+                                            .linebreak()
+                                            .text(
+                                                    
SinkUpsertMaterializeStrategy.ADAPTIVE
+                                                            + ": Alternate 
between MAP and VALUE depending on the number of entries for the given key 
starting with VALUE and switching to MAP upon reaching threshold.high value 
(and back to VALUE, when reaching low).")
+                                            .build());
+
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     public static final ConfigOption<SinkKeyedShuffle> 
TABLE_EXEC_SINK_KEYED_SHUFFLE =
             key("table.exec.sink.keyed-shuffle")
@@ -928,6 +996,43 @@ public class ExecutionConfigOptions {
         FIXED_DELAY
     }
 
+    /** SinkUpsertMaterializer strategy. */
+    @PublicEvolving
+    public enum SinkUpsertMaterializeStrategy {
+        /**
+         * Simple implementation based on {@code ValueState<List>} (the 
original implementation).
+         *
+         * <ul>
+         *   <li>optimal for cases with history under approx. 100 elements
+         *   <li>limited TTL support (per key granularity, i.e. no expiration 
for old history
+         *       elements)
+         * </ul>
+         */
+        LEGACY,
+        /**
+         * OrderedMultiSetState-based implementation based on a combination of 
several MapState
+         * maintaining ordering and fast lookup properties.
+         *
+         * <ul>
+         *   <li>faster and more memory-efficient on long histories
+         *   <li>slower on short histories
+         *   <li>currently, no TTL support (to be added in the future)
+         *   <li>requires more space
+         * </ul>
+         */
+        MAP,
+        /**
+         * Similar to LEGACY, but compatible with MAP and therefore allows to 
switch to ADAPTIVE.
+         */
+        VALUE,
+        /**
+         * Alternate between MAP and VALUE depending on the number of entries 
for the given key
+         * starting with VALUE and switching to MAP upon reaching 
threshold.high value (and back to
+         * VALUE, when reaching low).
+         */
+        ADAPTIVE
+    }
+
     /** Determine if CAST operates using the legacy behaviour or the new one. 
*/
     @Deprecated
     public enum LegacyCastBehaviour implements DescribedEnum {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
index 8e9e74e20ba..c09da8e49ac 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
@@ -190,6 +190,8 @@ public class ProjectedRowData implements RowData {
                 + Arrays.toString(indexMapping)
                 + ", mutableRow="
                 + row
+                + ", isNullAtNonProjected="
+                + isNullAtNonProjected
                 + '}';
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 59c517b9474..e1052a96533 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -19,16 +19,22 @@
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.HashCodeGenerator;
 import org.apache.flink.table.planner.connectors.CollectDynamicSink;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -42,10 +48,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
@@ -57,13 +65,21 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW;
 
 /**
  * Stream {@link ExecNode} to write data into an external sink defined by a 
{@link
@@ -89,9 +105,11 @@ import java.util.stream.Collectors;
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
 public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Object> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecSink.class);
 
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = 
"inputChangelogMode";
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = 
"requireUpsertMaterialize";
+    public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY = 
"upsertMaterializeStrategy";
     public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey";
 
     /** New introduced state metadata to enable operator-level state TTL 
configuration. */
@@ -104,6 +122,10 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
     private final boolean upsertMaterialize;
 
+    @JsonProperty(FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final SinkUpsertMaterializeStrategy upsertMaterializeStrategy;
+
     @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY)
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
     private final int[] inputUpsertKey;
@@ -120,6 +142,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             InputProperty inputProperty,
             LogicalType outputType,
             boolean upsertMaterialize,
+            SinkUpsertMaterializeStrategy upsertMaterializeStrategy,
             int[] inputUpsertKey,
             String description) {
         this(
@@ -129,6 +152,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 tableSinkSpec,
                 inputChangelogMode,
                 upsertMaterialize,
+                upsertMaterializeStrategy,
                 // do not serialize state metadata if upsertMaterialize is not 
required
                 upsertMaterialize
                         ? 
StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME)
@@ -147,6 +171,8 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK) DynamicTableSinkSpec 
tableSinkSpec,
             @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) ChangelogMode 
inputChangelogMode,
             @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean 
upsertMaterialize,
+            @Nullable @JsonProperty(FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY)
+                    SinkUpsertMaterializeStrategy 
sinkUpsertMaterializeStrategy,
             @Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> 
stateMetadataList,
             @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) int[] inputUpsertKey,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
@@ -166,6 +192,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         this.upsertMaterialize = upsertMaterialize;
         this.inputUpsertKey = inputUpsertKey;
         this.stateMetadataList = stateMetadataList;
+        this.upsertMaterializeStrategy = sinkUpsertMaterializeStrategy;
     }
 
     @SuppressWarnings("unchecked")
@@ -231,9 +258,11 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             ClassLoader classLoader,
             RowType physicalRowType,
             int[] inputUpsertKey) {
+
         final GeneratedRecordEqualiser rowEqualiser =
                 new EqualiserCodeGenerator(physicalRowType, classLoader)
                         .generateRecordEqualiser("SinkMaterializeEqualiser");
+
         final GeneratedRecordEqualiser upsertKeyEqualiser =
                 inputUpsertKey == null
                         ? null
@@ -243,16 +272,37 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                                         classLoader)
                                 
.generateRecordEqualiser("SinkMaterializeUpsertKeyEqualiser");
 
-        final long stateRetentionTime =
-                StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList);
+        GeneratedHashFunction rowHashFunction =
+                HashCodeGenerator.generateRowHash(
+                        new CodeGeneratorContext(config, classLoader),
+                        physicalRowType,
+                        "hashCode",
+                        IntStream.range(0, 
physicalRowType.getFieldCount()).toArray());
 
-        SinkUpsertMaterializer operator =
-                new SinkUpsertMaterializer(
-                        StateConfigUtil.createTtlConfig(stateRetentionTime),
-                        InternalSerializers.create(physicalRowType),
-                        rowEqualiser,
+        final GeneratedHashFunction upsertKeyHashFunction =
+                inputUpsertKey == null
+                        ? null
+                        : HashCodeGenerator.generateRowHash(
+                                new CodeGeneratorContext(config, classLoader),
+                                RowTypeUtils.projectRowType(physicalRowType, 
inputUpsertKey),
+                                "generated_hashcode_for_" + 
inputUpsertKey.length + "_keys",
+                                IntStream.range(0, 
inputUpsertKey.length).toArray());
+
+        StateTtlConfig ttlConfig =
+                StateConfigUtil.createTtlConfig(
+                        StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList));
+
+        final OneInputStreamOperator<RowData, RowData> operator =
+                createSumOperator(
+                        config,
+                        physicalRowType,
+                        inputUpsertKey,
                         upsertKeyEqualiser,
-                        inputUpsertKey);
+                        upsertKeyHashFunction,
+                        ttlConfig,
+                        rowEqualiser,
+                        rowHashFunction);
+
         final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
         final List<String> pkFieldNames =
                 Arrays.stream(primaryKeys)
@@ -280,4 +330,65 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         materializeTransform.setStateKeyType(keySelector.getProducedType());
         return materializeTransform;
     }
+
+    private OneInputStreamOperator<RowData, RowData> createSumOperator(
+            ExecNodeConfig config,
+            RowType physicalRowType,
+            int[] inputUpsertKey,
+            GeneratedRecordEqualiser upsertKeyEqualiser,
+            GeneratedHashFunction upsertKeyHashFunction,
+            StateTtlConfig ttlConfig,
+            GeneratedRecordEqualiser rowEqualiser,
+            GeneratedHashFunction rowHashFunction) {
+
+        SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
+                Optional.ofNullable(upsertMaterializeStrategy)
+                        .orElse(SinkUpsertMaterializeStrategy.LEGACY);
+
+        return sinkUpsertMaterializeStrategy == 
SinkUpsertMaterializeStrategy.LEGACY
+                ? SinkUpsertMaterializer.create(
+                        ttlConfig,
+                        physicalRowType,
+                        rowEqualiser,
+                        upsertKeyEqualiser,
+                        inputUpsertKey)
+                : SinkUpsertMaterializerV2.create(
+                        physicalRowType,
+                        rowEqualiser,
+                        upsertKeyEqualiser,
+                        rowHashFunction,
+                        upsertKeyHashFunction,
+                        inputUpsertKey,
+                        createStateConfig(
+                                sinkUpsertMaterializeStrategy,
+                                TimeDomain.EVENT_TIME,
+                                ttlConfig,
+                                config));
+    }
+
+    private static SequencedMultiSetStateConfig createStateConfig(
+            SinkUpsertMaterializeStrategy strategy,
+            TimeDomain ttlTimeDomain,
+            StateTtlConfig ttlConfig,
+            ReadableConfig config) {
+        if (ttlConfig.isEnabled()) {
+            // https://issues.apache.org/jira/browse/FLINK-38463
+            LOG.warn("TTL is not supported and will be disabled: {}", 
ttlConfig);
+            ttlConfig = StateTtlConfig.DISABLED;
+        }
+        switch (strategy) {
+            case VALUE:
+                return SequencedMultiSetStateConfig.forValue(ttlTimeDomain, 
ttlConfig);
+            case MAP:
+                return SequencedMultiSetStateConfig.forMap(ttlTimeDomain, 
ttlConfig);
+            case ADAPTIVE:
+                return SequencedMultiSetStateConfig.adaptive(
+                        ttlTimeDomain,
+                        
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH),
+                        
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW),
+                        ttlConfig);
+            default:
+                throw new IllegalArgumentException("Unsupported strategy: " + 
strategy);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 40eea54b991..332ab0bf89b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy,
 TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
 import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
@@ -102,13 +103,22 @@ class StreamPhysicalSink(
       .reuseOrCreate(cluster.getMetadataQuery)
       .getUpsertKeys(inputRel)
 
+    val config = unwrapTableConfig(this)
     new StreamExecSink(
-      unwrapTableConfig(this),
+      config,
       tableSinkSpec,
       inputChangelogMode,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       upsertMaterialize,
+      // persist upsertMaterialize strategy separately in the compiled plan to 
make it immutable;
+      // null means default (LEGACY) and is not written to the compiled plan 
for compatibility
+      // (in particular, for the existing tests)
+      // later on, it can't be obtained from the node config because it is 
merged with the new environment
+      config
+        .getOptional(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY)
+        .filter(strategy => 
!strategy.equals(SinkUpsertMaterializeStrategy.LEGACY))
+        .orElse(null),
       UpsertKeyUtil.getSmallestKey(inputUpsertKeys),
       getRelDetailedDescription)
   }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java
index 1425f11eea9..c01b688bd76 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java
@@ -21,11 +21,13 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.TableEnvironment;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link StatementSetImpl}. */
@@ -35,9 +37,13 @@ class StatementSetImplTest {
 
     @BeforeEach
     void setup() {
-        tableEnv =
-                (TableEnvironmentInternal)
-                        
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+        // prevent test randomization from changing the actual plan
+        settings.getConfiguration()
+                .set(
+                        TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
+                        SinkUpsertMaterializeStrategy.LEGACY);
+        tableEnv = (TableEnvironmentInternal) 
TableEnvironment.create(settings);
     }
 
     @Test
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 6d6dbcf4386..4bf7fbb957c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -53,6 +54,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -74,7 +76,14 @@ public abstract class JsonPlanTestBase {
 
     @BeforeEach
     protected void setup() throws Exception {
-        tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+        // overwrite any non-default strategy (potentially set by test 
randomization)
+        // to avoid test failures caused by incompatible compiled plans
+        settings.getConfiguration()
+                .set(
+                        TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
+                        SinkUpsertMaterializeStrategy.LEGACY);
+        tableEnv = TableEnvironment.create(settings);
     }
 
     @AfterEach
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java
new file mode 100644
index 00000000000..2adbc71333a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.RowTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An operator that maintains incoming records in state corresponding to the 
upsert keys and
+ * generates an upsert view for the downstream operator.
+ *
+ * <ul>
+ *   <li>Adds an insertion to state and emits it with updated {@link RowKind}.
+ *   <li>Applies a deletion to state.
+ *   <li>Emits a deletion with updated {@link RowKind} iff affects the last 
record or the state is
+ *       empty afterward. A deletion to an already updated record is swallowed.
+ * </ul>
+ */
+@Internal
+public class SinkUpsertMaterializerV2 extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SinkUpsertMaterializerV2.class);
+
+    private final SequencedMultiSetStateContext stateParameters;
+
+    // Buffer of emitted insertions on which deletions will be applied first.
+    // The row kind might be +I or +U and will be ignored when applying the 
deletion.
+    private transient TimestampedCollector<RowData> collector;
+
+    private transient SequencedMultiSetState<RowData> orderedMultiSetState;
+    private final boolean hasUpsertKey;
+
+    public SinkUpsertMaterializerV2(
+            boolean hasUpsertKey, SequencedMultiSetStateContext 
stateParameters) {
+        this.hasUpsertKey = hasUpsertKey;
+        this.stateParameters = stateParameters;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        orderedMultiSetState =
+                SequencedMultiSetState.create(
+                        stateParameters,
+                        getRuntimeContext(),
+                        getKeyedStateStore().getBackendTypeIdentifier());
+        collector = new TimestampedCollector<>(output);
+        LOG.info("Opened {} with upsert key: {}", 
this.getClass().getSimpleName(), hasUpsertKey);
+    }
+
+    @SuppressWarnings("OptionalGetWithoutIsPresent")
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        final RowData row = element.getValue();
+        final long timestamp = element.getTimestamp();
+
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                if (hasUpsertKey) {
+                    collect(row, orderedMultiSetState.add(row, 
timestamp).wasEmpty());
+                } else {
+                    collect(row, orderedMultiSetState.append(row, 
timestamp).wasEmpty());
+                }
+                break;
+
+            case UPDATE_BEFORE:
+            case DELETE:
+                StateChangeInfo<RowData> removalResult = 
orderedMultiSetState.remove(row);
+                switch (removalResult.getChangeType()) {
+                    case REMOVAL_OTHER:
+                        // do nothing;
+                        break;
+                    case REMOVAL_NOT_FOUND:
+                        LOG.warn("Not found record to retract"); // not 
logging the record due for
+                        // security
+                        break;
+                    case REMOVAL_ALL:
+                        collect(removalResult.getPayload().get(), 
RowKind.DELETE);
+                        break;
+                    case REMOVAL_LAST_ADDED:
+                        collect(removalResult.getPayload().get(), 
RowKind.UPDATE_AFTER);
+                        break;
+                    default:
+                        throw new IllegalArgumentException(
+                                "Unexpected removal result type: " + 
removalResult.getChangeType());
+                }
+        }
+    }
+
+    private void collect(RowData row, boolean notExisted) {
+        collect(row, getRowKind(notExisted));
+    }
+
+    private RowKind getRowKind(boolean notExisted) {
+        return notExisted ? RowKind.INSERT : RowKind.UPDATE_AFTER;
+    }
+
+    private void collect(RowData row, RowKind withKind) {
+        RowKind orig = row.getRowKind();
+        row.setRowKind(withKind);
+        collector.collect(row);
+        row.setRowKind(orig);
+    }
+
+    public static SinkUpsertMaterializerV2 create(
+            RowType physicalRowType,
+            GeneratedRecordEqualiser rowEqualiser,
+            GeneratedRecordEqualiser upsertKeyEqualiser,
+            GeneratedHashFunction rowHashFunction,
+            GeneratedHashFunction upsertKeyHashFunction,
+            int[] inputUpsertKey,
+            SequencedMultiSetStateConfig stateSettings) {
+
+        boolean hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length 
> 0;
+
+        return new SinkUpsertMaterializerV2(
+                hasUpsertKey,
+                new SequencedMultiSetStateContext(
+                        checkNotNull(
+                                hasUpsertKey
+                                        ? InternalSerializers.create(
+                                                RowTypeUtils.projectRowType(
+                                                        physicalRowType, 
inputUpsertKey))
+                                        : 
InternalSerializers.create(physicalRowType)),
+                        checkNotNull(hasUpsertKey ? upsertKeyEqualiser : 
rowEqualiser),
+                        checkNotNull(hasUpsertKey ? upsertKeyHashFunction : 
rowHashFunction),
+                        InternalSerializers.create(physicalRowType),
+                        row ->
+                                hasUpsertKey
+                                        ? 
ProjectedRowData.from(inputUpsertKey, true)
+                                                .replaceRow(row)
+                                        : row,
+                        stateSettings));
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
index a6b16c343d8..1d55937abc7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
@@ -20,24 +20,28 @@ package 
org.apache.flink.table.runtime.sequencedmultisetstate;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.streaming.api.TimeDomain;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Configuration for {@link SequencedMultiSetState}. */
-public class SequencedMultiSetStateConfig {
+public class SequencedMultiSetStateConfig implements Serializable {
 
-    private final SequencedMultiSetState.Strategy strategy;
+    private static final long serialVersionUID = 1L;
+
+    private final Strategy strategy;
     private final @Nullable Long adaptiveHighThresholdOverride;
     private final @Nullable Long adaptiveLowThresholdOverride;
     private final StateTtlConfig ttlConfig;
     private final TimeSelector ttlTimeSelector;
 
     public SequencedMultiSetStateConfig(
-            SequencedMultiSetState.Strategy strategy,
+            Strategy strategy,
             @Nullable Long adaptiveHighThresholdOverride,
             @Nullable Long adaptiveLowThresholdOverride,
             StateTtlConfig ttlConfig,
@@ -51,7 +55,7 @@ public class SequencedMultiSetStateConfig {
     }
 
     public SequencedMultiSetStateConfig(
-            SequencedMultiSetState.Strategy strategy,
+            Strategy strategy,
             @Nullable Long adaptiveHighThresholdOverride,
             @Nullable Long adaptiveLowThresholdOverride,
             StateTtlConfig ttlConfig,
@@ -74,22 +78,22 @@ public class SequencedMultiSetStateConfig {
     public static SequencedMultiSetStateConfig forMap(
             TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) {
         return new SequencedMultiSetStateConfig(
-                SequencedMultiSetState.Strategy.MAP_STATE, null, null, 
ttlConfig, ttlTimeDomain);
+                Strategy.MAP_STATE, null, null, ttlConfig, ttlTimeDomain);
     }
 
     public static SequencedMultiSetStateConfig forValue(
             TimeDomain ttlTimeDomain, StateTtlConfig ttl) {
         return new SequencedMultiSetStateConfig(
-                SequencedMultiSetState.Strategy.VALUE_STATE, null, null, ttl, 
ttlTimeDomain);
+                Strategy.VALUE_STATE, null, null, ttl, ttlTimeDomain);
     }
 
     public static SequencedMultiSetStateConfig adaptive(
             TimeDomain ttlTimeDomain,
-            long adaptiveHighThresholdOverride,
-            long adaptiveLowThresholdOverride,
+            @Nullable Long adaptiveHighThresholdOverride,
+            @Nullable Long adaptiveLowThresholdOverride,
             StateTtlConfig ttl) {
         return new SequencedMultiSetStateConfig(
-                SequencedMultiSetState.Strategy.ADAPTIVE,
+                Strategy.ADAPTIVE,
                 adaptiveHighThresholdOverride,
                 adaptiveLowThresholdOverride,
                 ttl,
@@ -100,7 +104,7 @@ public class SequencedMultiSetStateConfig {
         return ttlTimeSelector;
     }
 
-    public SequencedMultiSetState.Strategy getStrategy() {
+    public Strategy getStrategy() {
         return strategy;
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
index b7412d2abcd..939eea59b95 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
@@ -23,24 +23,30 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 
+import java.io.Serializable;
 import java.util.function.Function;
 
 /** {@link SequencedMultiSetState} (creation) context. */
-public class SequencedMultiSetStateContext {
+public class SequencedMultiSetStateContext implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     public final SequencedMultiSetStateConfig config;
     public final TypeSerializer<RowData> keySerializer;
     public final GeneratedRecordEqualiser generatedKeyEqualiser;
     public final GeneratedHashFunction generatedKeyHashFunction;
     public final TypeSerializer<RowData> recordSerializer;
-    public final Function<RowData, RowData> keyExtractor;
+    public final KeyExtractor keyExtractor;
+
+    /** */
+    public interface KeyExtractor extends Function<RowData, RowData>, 
Serializable {}
 
     public SequencedMultiSetStateContext(
             TypeSerializer<RowData> keySerializer,
             GeneratedRecordEqualiser generatedKeyEqualiser,
             GeneratedHashFunction generatedKeyHashFunction,
             TypeSerializer<RowData> recordSerializer,
-            Function<RowData, RowData> keyExtractor,
+            KeyExtractor keyExtractor,
             SequencedMultiSetStateConfig config) {
         this.keySerializer = keySerializer;
         this.generatedKeyEqualiser = generatedKeyEqualiser;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
index 40fe6f456e6..057a833b8a8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.util.clock.SystemClock;
 
+import java.io.Serializable;
+
 @Internal
 @FunctionalInterface
-public interface TimeSelector {
+public interface TimeSelector extends Serializable {
 
     long getTimestamp(long elementTimestamp);
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
index 1ab7ac4ae84..b0ae3c26827 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.test.util.MigrationTest;
@@ -44,6 +47,8 @@ import static org.apache.flink.FlinkVersion.current;
 import static 
org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename;
 import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR;
 import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER;
+import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.GENERATED_HASH_FUNCTION;
+import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.GENERATED_UPSERT_HASH_FUNCTION;
 import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES;
 import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG;
 import static 
org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY;
@@ -74,11 +79,14 @@ public class SinkUpsertMaterializerMigrationTest implements 
MigrationTest {
         for (FlinkVersion fromVersion : versions) {
             for (SinkUpsertMaterializerStateBackend backend :
                     SinkUpsertMaterializerStateBackend.values()) {
-                result.add(
-                        new Object[] {
-                            new SinkOperationMode(fromVersion, backend),
-                            new SinkOperationMode(current(), backend)
-                        });
+                for (SinkUpsertMaterializerVersion sumVersion :
+                        SinkUpsertMaterializerVersion.values()) {
+                    result.add(
+                            new Object[] {
+                                new SinkOperationMode(fromVersion, backend, 
sumVersion),
+                                new SinkOperationMode(current(), backend, 
sumVersion)
+                            });
+                }
             }
         }
         return result;
@@ -96,13 +104,33 @@ public class SinkUpsertMaterializerMigrationTest 
implements MigrationTest {
     private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
             SinkOperationMode mode, String snapshotPath) throws Exception {
         int[] inputUpsertKey = {UPSERT_KEY};
-        OneInputStreamOperator<RowData, RowData> materializer =
-                SinkUpsertMaterializer.create(
-                        TTL_CONFIG,
-                        RowType.of(LOGICAL_TYPES),
-                        EQUALISER,
-                        UPSERT_KEY_EQUALISER,
-                        inputUpsertKey);
+        StateTtlConfig ttlConfig = mode.sumVersion.reconfigureTtl(TTL_CONFIG);
+        OneInputStreamOperator<RowData, RowData> materializer;
+        switch (mode.sumVersion) {
+            case V1:
+                materializer =
+                        SinkUpsertMaterializer.create(
+                                ttlConfig,
+                                RowType.of(LOGICAL_TYPES),
+                                EQUALISER,
+                                UPSERT_KEY_EQUALISER,
+                                inputUpsertKey);
+                break;
+            case V2:
+                materializer =
+                        SinkUpsertMaterializerV2.create(
+                                RowType.of(LOGICAL_TYPES),
+                                EQUALISER,
+                                UPSERT_KEY_EQUALISER,
+                                GENERATED_HASH_FUNCTION,
+                                GENERATED_UPSERT_HASH_FUNCTION,
+                                inputUpsertKey,
+                                SequencedMultiSetStateConfig.defaults(
+                                        TimeDomain.PROCESSING_TIME, 
ttlConfig));
+                break;
+            default:
+                throw new IllegalArgumentException(mode.sumVersion.name());
+        }
         KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
                 SinkUpsertMaterializerTest.createHarness(
                         materializer, mode.stateBackend, LOGICAL_TYPES);
@@ -141,26 +169,34 @@ public class SinkUpsertMaterializerMigrationTest 
implements MigrationTest {
 
         testHarness.setStateTtlProcessingTime(1002);
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        ASSERTOR.shouldEmitNothing(testHarness);
+        if (migrateTo.sumVersion.isTtlSupported()) {
+            ASSERTOR.shouldEmitNothing(testHarness);
+        } else {
+            ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 4L, 1, 
"a4"));
+        }
     }
 
     private static String getFileName(SinkOperationMode mode) {
         return String.format(
-                "migration-flink-%s-%s-%s-snapshot", mode.version, 
mode.stateBackend, "V1");
+                "migration-flink-%s-%s-%s-snapshot",
+                mode.version, mode.stateBackend, mode.sumVersion);
     }
 
     @SnapshotsGenerator
     public void writeSnapshot(FlinkVersion version) throws Exception {
         for (SinkUpsertMaterializerStateBackend stateBackend :
                 SinkUpsertMaterializerStateBackend.values()) {
-            SinkOperationMode mode = new SinkOperationMode(version, 
stateBackend);
-            try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
-                    createHarness(mode, null)) {
-                testCorrectnessBeforeSnapshot(harness);
-                Path parent = Paths.get("src/test/resources", FOLDER_NAME);
-                Files.createDirectories(parent);
-                OperatorSnapshotUtil.writeStateHandle(
-                        harness.snapshot(1L, 1L), 
parent.resolve(getFileName(mode)).toString());
+            for (SinkUpsertMaterializerVersion sumVersion :
+                    SinkUpsertMaterializerVersion.values()) {
+                SinkOperationMode mode = new SinkOperationMode(version, 
stateBackend, sumVersion);
+                try (OneInputStreamOperatorTestHarness<RowData, RowData> 
harness =
+                        createHarness(mode, null)) {
+                    testCorrectnessBeforeSnapshot(harness);
+                    Path parent = Paths.get("src/test/resources", FOLDER_NAME);
+                    Files.createDirectories(parent);
+                    OperatorSnapshotUtil.writeStateHandle(
+                            harness.snapshot(1L, 1L), 
parent.resolve(getFileName(mode)).toString());
+                }
             }
         }
     }
@@ -174,16 +210,20 @@ public class SinkUpsertMaterializerMigrationTest 
implements MigrationTest {
     private static class SinkOperationMode {
         private final FlinkVersion version;
         private final SinkUpsertMaterializerStateBackend stateBackend;
+        private final SinkUpsertMaterializerVersion sumVersion;
 
         private SinkOperationMode(
-                FlinkVersion version, SinkUpsertMaterializerStateBackend 
stateBackend) {
+                FlinkVersion version,
+                SinkUpsertMaterializerStateBackend stateBackend,
+                SinkUpsertMaterializerVersion sumVersion) {
             this.version = version;
             this.stateBackend = stateBackend;
+            this.sumVersion = sumVersion;
         }
 
         @Override
         public String toString() {
-            return String.format("flink=%s, state=%s}", version, stateBackend);
+            return String.format("flink=%s, state=%s, sum=%s}", version, 
stateBackend, sumVersion);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java
index 9d8990264bc..833634b28c7 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java
@@ -19,20 +19,24 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.HashFunction;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -65,47 +69,58 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
 @RunWith(Parameterized.class)
 public class SinkUpsertMaterializerRescalingTest {
 
-    @Parameter public SinkUpsertMaterializerStateBackend backend;
+    @Parameter(0)
+    public SinkUpsertMaterializerVersion sumVersion;
 
-    @Parameterized.Parameters(name = "stateBackend={0}")
+    @Parameter(1)
+    public SinkUpsertMaterializerStateBackend backend;
+
+    @Parameterized.Parameters(name = "sumVersion={0}, stateBackend={1}")
     public static Object[][] generateTestParameters() {
         List<Object[]> result = new ArrayList<>();
-        for (SinkUpsertMaterializerStateBackend backend :
-                SinkUpsertMaterializerStateBackend.values()) {
-            result.add(new Object[] {backend});
+        for (SinkUpsertMaterializerVersion sumVersion : 
SinkUpsertMaterializerVersion.values()) {
+            for (SinkUpsertMaterializerStateBackend backend :
+                    SinkUpsertMaterializerStateBackend.values()) {
+                result.add(new Object[] {sumVersion, backend});
+            }
         }
         return result.toArray(new Object[0][]);
     }
 
     @Test
     public void testScaleUpThenDown() throws Exception {
-        testRescaleFromToFrom(10, 2, 3, backend, backend);
+        testRescaleFromToFrom(10, 2, 3, backend, backend, sumVersion);
     }
 
     @Test
     public void testScaleDownThenUp() throws Exception {
-        testRescaleFromToFrom(10, 3, 2, backend, backend);
+        testRescaleFromToFrom(10, 3, 2, backend, backend, sumVersion);
     }
 
     @Test
     public void testRecovery() throws Exception {
-        testRescaleFromToFrom(1, 1, 1, backend, backend);
+        testRescaleFromToFrom(1, 1, 1, backend, backend, sumVersion);
     }
 
     @Test
     public void testForwardAndBackwardMigration() throws Exception {
-        testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend));
+        testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend), 
sumVersion);
     }
 
     @Test
     public void testScaleUpThenDownWithMigration() throws Exception {
-        testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend));
+        testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend), 
sumVersion);
     }
 
     @Test
     public void testScaleDownThenUpWithMigration() throws Exception {
         testRescaleFromToFrom(
-                7, 5, 1, backend, 
getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP));
+                7,
+                5,
+                1,
+                backend,
+                getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP),
+                sumVersion);
     }
 
     private SinkUpsertMaterializerStateBackend getOtherBackend(
@@ -121,7 +136,8 @@ public class SinkUpsertMaterializerRescalingTest {
             final int fromParallelism,
             final int toParallelism,
             final SinkUpsertMaterializerStateBackend fromBackend,
-            final SinkUpsertMaterializerStateBackend toBackend)
+            final SinkUpsertMaterializerStateBackend toBackend,
+            final SinkUpsertMaterializerVersion sumVersion)
             throws Exception {
 
         int[] currentParallelismRef = new int[] {fromParallelism};
@@ -150,7 +166,13 @@ public class SinkUpsertMaterializerRescalingTest {
                 };
 
         initHarnessesAndMaterializers(
-                harnesses, materializers, fromBackend, maxParallelism, 
fromParallelism, null);
+                harnesses,
+                materializers,
+                fromBackend,
+                maxParallelism,
+                fromParallelism,
+                null,
+                sumVersion);
 
         int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1"));
         ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
@@ -163,7 +185,13 @@ public class SinkUpsertMaterializerRescalingTest {
 
         currentParallelismRef[0] = toParallelism;
         initHarnessesAndMaterializers(
-                harnesses, materializers, toBackend, maxParallelism, 
toParallelism, subtaskStates);
+                harnesses,
+                materializers,
+                toBackend,
+                maxParallelism,
+                toParallelism,
+                subtaskStates,
+                sumVersion);
 
         idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3"));
         ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 
3L, 1, "a3"));
@@ -180,7 +208,8 @@ public class SinkUpsertMaterializerRescalingTest {
                 fromBackend,
                 maxParallelism,
                 fromParallelism,
-                subtaskStates);
+                subtaskStates,
+                sumVersion);
 
         idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
         ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 
3L, 1, "a3"));
@@ -202,7 +231,11 @@ public class SinkUpsertMaterializerRescalingTest {
                 .forEach(h -> h.setStateTtlProcessingTime(1002));
 
         idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
-        ASSERTOR.shouldEmitNothing(harnesses[idx]);
+        if (sumVersion.isTtlSupported()) {
+            ASSERTOR.shouldEmitNothing(harnesses[idx]);
+        } else {
+            ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 4L, 
1, "a4"));
+        }
 
         Arrays.stream(harnesses)
                 .filter(Objects::nonNull)
@@ -222,16 +255,36 @@ public class SinkUpsertMaterializerRescalingTest {
             SinkUpsertMaterializerStateBackend backend,
             int maxParallelism,
             int parallelism,
-            @Nullable List<OperatorSubtaskState> subtaskStates)
+            @Nullable List<OperatorSubtaskState> subtaskStates,
+            SinkUpsertMaterializerVersion sumVersion)
             throws Exception {
         for (int i = 0; i < parallelism; ++i) {
-            materializers[i] =
-                    SinkUpsertMaterializer.create(
-                            TTL_CONFIG,
-                            RowType.of(LOGICAL_TYPES),
-                            EQUALISER,
-                            UPSERT_KEY_EQUALISER,
-                            null);
+            switch (sumVersion) {
+                case V1:
+                    materializers[i] =
+                            SinkUpsertMaterializer.create(
+                                    TTL_CONFIG,
+                                    RowType.of(LOGICAL_TYPES),
+                                    EQUALISER,
+                                    UPSERT_KEY_EQUALISER,
+                                    null);
+                    break;
+                case V2:
+                    materializers[i] =
+                            SinkUpsertMaterializerV2.create(
+                                    RowType.of(LOGICAL_TYPES),
+                                    EQUALISER,
+                                    UPSERT_KEY_EQUALISER,
+                                    HASH_FUNCTION,
+                                    UPSERT_KEY_HASH_FUNCTION,
+                                    null,
+                                    SequencedMultiSetStateConfig.defaults(
+                                            TimeDomain.PROCESSING_TIME,
+                                            
sumVersion.reconfigureTtl(TTL_CONFIG)));
+                    break;
+                default:
+                    throw new IllegalArgumentException("unknown version: " + 
sumVersion);
+            }
             harnesses[i] =
                     new KeyedOneInputStreamOperatorTestHarness<>(
                             materializers[i],
@@ -329,6 +382,18 @@ public class SinkUpsertMaterializerRescalingTest {
         }
     }
 
+    private static class MyGeneratedHashFunction extends GeneratedHashFunction 
{
+
+        public MyGeneratedHashFunction() {
+            super("", "", new Object[0], new Configuration());
+        }
+
+        @Override
+        public HashFunction newInstance(ClassLoader classLoader) {
+            return new TestRecordEqualiser();
+        }
+    }
+
     private static final StateTtlConfig TTL_CONFIG = 
StateConfigUtil.createTtlConfig(1000);
 
     private static final LogicalType[] LOGICAL_TYPES =
@@ -342,6 +407,8 @@ public class SinkUpsertMaterializerRescalingTest {
 
     private static final GeneratedRecordEqualiser EQUALISER = new 
MyGeneratedRecordEqualiser();
 
+    private static final GeneratedHashFunction HASH_FUNCTION = new 
MyGeneratedHashFunction();
+
     private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 
@@ -350,4 +417,13 @@ public class SinkUpsertMaterializerRescalingTest {
                     return new TestUpsertKeyEqualiser();
                 }
             };
+
+    private static final GeneratedHashFunction UPSERT_KEY_HASH_FUNCTION =
+            new GeneratedHashFunction("", "", new Object[0], new 
Configuration()) {
+
+                @Override
+                public HashFunction newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index f8c9a44017c..abe0d9ad213 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -29,9 +30,13 @@ import 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.HashFunction;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -49,10 +54,13 @@ import org.junit.runners.Parameterized.Parameter;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.java.tuple.Tuple2.of;
+import static org.apache.flink.streaming.api.TimeDomain.PROCESSING_TIME;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
@@ -66,16 +74,22 @@ public class SinkUpsertMaterializerTest {
 
     static final int UPSERT_KEY = 0;
 
-    @Parameter public SinkUpsertMaterializerStateBackend stateBackend;
+    @Parameter(0)
+    public SinkUpsertMaterializeStrategy strategy;
 
-    @Parameterized.Parameters(name = "stateBackend={0}")
-    public static Object[][] generateTestParameters() {
+    @Parameter(1)
+    public SinkUpsertMaterializerStateBackend stateBackend;
+
+    @Parameterized.Parameters(name = "stateStrategy={0}, stateBackend={1}, ")
+    public static Collection<Object[]> generateTestParameters() {
         List<Object[]> result = new ArrayList<>();
         for (SinkUpsertMaterializerStateBackend backend :
                 SinkUpsertMaterializerStateBackend.values()) {
-            result.add(new Object[] {backend});
+            for (SinkUpsertMaterializeStrategy strategy : 
SinkUpsertMaterializeStrategy.values()) {
+                result.add(new Object[] {strategy, backend});
+            }
         }
-        return result.toArray(new Object[0][]);
+        return result;
     }
 
     static final StateTtlConfig TTL_CONFIG = 
StateConfigUtil.createTtlConfig(1000);
@@ -92,6 +106,14 @@ public class SinkUpsertMaterializerTest {
                 }
             };
 
+    static final GeneratedHashFunction GENERATED_HASH_FUNCTION =
+            new GeneratedHashFunction("", "", new Object[0], new 
Configuration()) {
+                @Override
+                public HashFunction newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
     static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 
@@ -101,6 +123,14 @@ public class SinkUpsertMaterializerTest {
                 }
             };
 
+    static final GeneratedHashFunction GENERATED_UPSERT_HASH_FUNCTION =
+            new GeneratedHashFunction("", "", new Object[0], new 
Configuration()) {
+                @Override
+                public HashFunction newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
     /**
      * If the composite serializer in {@link SinkUpsertMaterializer} works on 
projected fields then
      * it might use the wrong serializer, e.g. the {@link VarCharType} instead 
of the {@link
@@ -203,7 +233,11 @@ public class SinkUpsertMaterializerTest {
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        ASSERTOR.shouldEmitNothing(testHarness);
+        if (isTtlSupported()) {
+            ASSERTOR.shouldEmitNothing(testHarness);
+        } else {
+            ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 4L, 1, 
"a4"));
+        }
 
         testHarness.close();
     }
@@ -240,7 +274,11 @@ public class SinkUpsertMaterializerTest {
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        ASSERTOR.shouldEmitNothing(testHarness);
+        if (isTtlSupported()) {
+            ASSERTOR.shouldEmitNothing(testHarness);
+        } else {
+            ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 4L, 1, 
"a4"));
+        }
 
         testHarness.close();
     }
@@ -335,7 +373,7 @@ public class SinkUpsertMaterializerTest {
         testHarness.close();
     }
 
-    private static class TestRecordEqualiser implements RecordEqualiser {
+    private static class TestRecordEqualiser implements RecordEqualiser, 
HashFunction {
         @Override
         public boolean equals(RowData row1, RowData row2) {
             return row1.getRowKind() == row2.getRowKind()
@@ -343,15 +381,26 @@ public class SinkUpsertMaterializerTest {
                     && row1.getInt(1) == row2.getInt(1)
                     && row1.getString(2).equals(row2.getString(2));
         }
-    }
 
-    private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+        @Override
+        public int hashCode(Object data) {
+            RowData rd = (RowData) data;
+            return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), 
rd.getString(2));
+        }
+    }
 
+    private static class TestUpsertKeyEqualiser implements RecordEqualiser, 
HashFunction {
         @Override
         public boolean equals(RowData row1, RowData row2) {
             return row1.getRowKind() == row2.getRowKind()
                     && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY);
         }
+
+        @Override
+        public int hashCode(Object data) {
+            RowData rd = (RowData) data;
+            return Objects.hash(rd.getRowKind(), rd.getLong(UPSERT_KEY));
+        }
     }
 
     private OneInputStreamOperator<RowData, RowData> 
createOperatorWithoutUpsertKey() {
@@ -360,8 +409,51 @@ public class SinkUpsertMaterializerTest {
 
     private OneInputStreamOperator<RowData, RowData> createOperator(
             LogicalType[] types, int... upsertKey) {
-        return SinkUpsertMaterializer.create(
-                TTL_CONFIG, RowType.of(types), EQUALISER, 
UPSERT_KEY_EQUALISER, upsertKey);
+        switch (strategy) {
+            case LEGACY:
+                return SinkUpsertMaterializer.create(
+                        TTL_CONFIG, RowType.of(types), EQUALISER, 
UPSERT_KEY_EQUALISER, upsertKey);
+            case MAP:
+                return createV2(
+                        types,
+                        upsertKey,
+                        SequencedMultiSetStateConfig.forMap(PROCESSING_TIME, 
getStateTtlConfig()));
+            case VALUE:
+                return createV2(
+                        types,
+                        upsertKey,
+                        SequencedMultiSetStateConfig.forValue(
+                                PROCESSING_TIME, getStateTtlConfig()));
+            case ADAPTIVE:
+                return createV2(
+                        types,
+                        upsertKey,
+                        SequencedMultiSetStateConfig.adaptive(
+                                PROCESSING_TIME, 10L, 5L, 
getStateTtlConfig()));
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown SinkUpsertMaterializeStrategy" + strategy);
+        }
+    }
+
+    private StateTtlConfig getStateTtlConfig() {
+        SinkUpsertMaterializerVersion version =
+                strategy == SinkUpsertMaterializeStrategy.LEGACY
+                        ? SinkUpsertMaterializerVersion.V1
+                        : SinkUpsertMaterializerVersion.V2;
+        return version.reconfigureTtl(TTL_CONFIG);
+    }
+
+    private static SinkUpsertMaterializerV2 createV2(
+            LogicalType[] types, int[] upsertKey, SequencedMultiSetStateConfig 
stateSettings) {
+        return SinkUpsertMaterializerV2.create(
+                RowType.of(types),
+                EQUALISER,
+                UPSERT_KEY_EQUALISER,
+                GENERATED_HASH_FUNCTION,
+                GENERATED_UPSERT_HASH_FUNCTION,
+                upsertKey,
+                stateSettings);
     }
 
     private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
@@ -460,4 +552,8 @@ public class SinkUpsertMaterializerTest {
             return harness.snapshotWithLocalState(newCheckpointID, 
newCheckpointID);
         }
     }
+
+    private boolean isTtlSupported() {
+        return strategy == SinkUpsertMaterializeStrategy.LEGACY;
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerVersion.java
similarity index 53%
copy from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
copy to 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerVersion.java
index 40fe6f456e6..41494ee24d8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerVersion.java
@@ -16,26 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.sequencedmultisetstate;
+package org.apache.flink.table.runtime.operators.sink;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.api.common.state.StateTtlConfig;
 
-@Internal
-@FunctionalInterface
-public interface TimeSelector {
+/** Version of SinkUpsertMaterializer to test. */
+public enum SinkUpsertMaterializerVersion {
+    V1 {
+        @Override
+        public boolean isTtlSupported() {
+            return true;
+        }
+    },
+    V2 {
+        @Override
+        public boolean isTtlSupported() {
+            // todo: add support for TTL and remove checking related code
+            return false;
+        }
+    };
 
-    long getTimestamp(long elementTimestamp);
+    public abstract boolean isTtlSupported();
 
-    static TimeSelector getTimeDomain(TimeDomain timeDomain) {
-        switch (timeDomain) {
-            case EVENT_TIME:
-                return elementTimestamp -> elementTimestamp;
-            case PROCESSING_TIME:
-                return elementTimestamp -> 
SystemClock.getInstance().absoluteTimeMillis();
-            default:
-                throw new IllegalStateException("unknown time domain: " + 
timeDomain);
-        }
+    StateTtlConfig reconfigureTtl(StateTtlConfig ttlConfig) {
+        return isTtlSupported() ? ttlConfig : StateTtlConfig.DISABLED;
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
index f31db7c80b1..b68638115cf 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
@@ -59,6 +59,7 @@ import 
org.apache.flink.table.runtime.generated.RecordEqualiser;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext.KeyExtractor;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
@@ -79,7 +80,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.function.Function;
 import java.util.stream.LongStream;
 
 import static 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_ALL;
@@ -304,7 +304,7 @@ public class SequencedMultiSetStateTest {
     /** Test that loading and clearing the cache doesn't impact correctness. */
     @TestTemplate
     public void testKeyExtraction() throws Exception {
-        final Function<RowData, RowData> keyExtractor =
+        final KeyExtractor keyExtractor =
                 row -> ProjectedRowData.from(new int[] {1}).replaceRow(row);
 
         runTest(
@@ -471,14 +471,14 @@ public class SequencedMultiSetStateTest {
                             SequencedMultiSetState<RowData>, 
InternalKeyContext<String>, Exception>
                     test)
             throws Exception {
-        runTest(test, Function.identity(), KEY_POS);
+        runTest(test, new IdentityKeyExtractor(), KEY_POS);
     }
 
     private void runTest(
             BiConsumerWithException<
                             SequencedMultiSetState<RowData>, 
InternalKeyContext<String>, Exception>
                     test,
-            Function<RowData, RowData> keyExtractor,
+            KeyExtractor keyExtractor,
             int keyPos)
             throws Exception {
         SequencedMultiSetStateContext p =
@@ -644,4 +644,12 @@ public class SequencedMultiSetStateTest {
                 break;
         }
     }
+
+    private static class IdentityKeyExtractor implements KeyExtractor {
+
+        @Override
+        public RowData apply(RowData rowData) {
+            return rowData;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V2-snapshot
 
b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V2-snapshot
new file mode 100644
index 00000000000..956b64af2d1
Binary files /dev/null and 
b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V2-snapshot
 differ
diff --git 
a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V2-snapshot
 
b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V2-snapshot
new file mode 100644
index 00000000000..33308346686
Binary files /dev/null and 
b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V2-snapshot
 differ
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index b679676b6be..7b24c8e6e4b 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -42,6 +42,13 @@ under the License.
                        <scope>compile</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils-junit</artifactId>
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index badde2c002d..d8634db02ac 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
 
 import java.net.URL;
@@ -40,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;
 
 /** A {@link StreamExecutionEnvironment} that executes its jobs on {@link 
MiniCluster}. */
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
@@ -206,6 +208,13 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                 
ConfigOptions.key("table.exec.unbounded-over.version").intType().noDefaultValue(),
                 1,
                 2);
+        randomize(
+                conf,
+                TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
+                ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY,
+                ExecutionConfigOptions.SinkUpsertMaterializeStrategy.VALUE,
+                ExecutionConfigOptions.SinkUpsertMaterializeStrategy.MAP,
+                ExecutionConfigOptions.SinkUpsertMaterializeStrategy.ADAPTIVE);
     }
 
     /**

Reply via email to