This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83db8f73f79cb2787da05bc329bec276a4e52d08
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Sep 30 16:56:34 2025 +0200

    [FLINK-38464] Introduce OrderedMultiSetState
---
 .../AdaptiveSequencedMultiSetState.java            | 201 +++++++
 .../SequencedMultiSetState.java                    | 227 ++++++++
 .../SequencedMultiSetStateConfig.java              | 118 ++++
 .../SequencedMultiSetStateContext.java             |  52 ++
 .../sequencedmultisetstate/TimeSelector.java       |  41 ++
 .../ValueStateMultiSetState.java                   | 207 +++++++
 .../linked/LinkedMultiSetState.java                | 323 ++++++++++
 .../sequencedmultisetstate/linked/MetaSqnInfo.java |  58 ++
 .../linked/MetaSqnInfoSerializer.java              | 101 ++++
 .../sequencedmultisetstate/linked/Node.java        | 124 ++++
 .../linked/NodeSerializer.java                     | 210 +++++++
 .../sequencedmultisetstate/linked/RowDataKey.java  |  81 +++
 .../linked/RowDataKeySerializer.java               | 136 +++++
 .../linked/RowDataKeySerializerSnapshot.java       | 153 +++++
 .../sequencedmultisetstate/linked/RowSqnInfo.java  |  66 +++
 .../linked/RowSqnInfoSerializer.java               | 101 ++++
 .../SequencedMultiSetStateTest.java                | 647 +++++++++++++++++++++
 .../linked/MetaSqnInfoSerializerTest.java          |  46 ++
 .../linked/NodeSerializerTest.java                 |  53 ++
 .../linked/RowDataKeySerializerTest.java           | 104 ++++
 .../linked/RowSqnInfoSerializerTest.java           |  46 ++
 flink-tests/pom.xml                                |   8 +
 .../TypeSerializerTestCoverageTest.java            |   8 +
 23 files changed, 3111 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java
new file mode 100644
index 00000000000..8abfa76a8dc
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An {@link SequencedMultiSetState} that switches dynamically between {@link
+ * ValueStateMultiSetState} and {@link LinkedMultiSetState} based on the 
number of elements.
+ */
+class AdaptiveSequencedMultiSetState implements 
SequencedMultiSetState<RowData> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveSequencedMultiSetState.class);
+
+    private final ValueStateMultiSetState smallState;
+    private final LinkedMultiSetState largeState;
+    private final long switchToLargeThreshold;
+    private final long switchToSmallThreshold;
+
+    AdaptiveSequencedMultiSetState(
+            ValueStateMultiSetState smallState,
+            LinkedMultiSetState largeState,
+            long switchToLargeThreshold,
+            long switchToSmallThreshold) {
+        checkArgument(switchToLargeThreshold > switchToSmallThreshold);
+        this.smallState = smallState;
+        this.largeState = largeState;
+        this.switchToLargeThreshold = switchToLargeThreshold;
+        this.switchToSmallThreshold = switchToSmallThreshold;
+        LOG.info(
+                "Created {} with thresholds: {}=>large, {}=>small",
+                this.getClass().getSimpleName(),
+                switchToLargeThreshold,
+                switchToSmallThreshold);
+    }
+
+    @Override
+    public StateChangeInfo<RowData> add(RowData element, long timestamp) 
throws Exception {
+        return execute(
+                state -> state.add(element, timestamp), 
StateChangeInfo::getSizeAfter, "add");
+    }
+
+    @Override
+    public StateChangeInfo<RowData> append(RowData element, long timestamp) 
throws Exception {
+        return execute(
+                state -> state.append(element, timestamp), 
StateChangeInfo::getSizeAfter, "append");
+    }
+
+    @Override
+    public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
+        if (smallState.isEmpty()) {
+            return largeState.iterator();
+        } else {
+            return smallState.iterator();
+        }
+    }
+
+    @Override
+    public boolean isEmpty() throws IOException {
+        // large state check is faster
+        return largeState.isEmpty() && smallState.isEmpty();
+    }
+
+    public StateChangeInfo<RowData> remove(RowData element) throws Exception {
+        return execute(state -> state.remove(element), 
StateChangeInfo::getSizeAfter, "remove");
+    }
+
+    @Override
+    public void clear() {
+        clearCache();
+        smallState.clear();
+        largeState.clear();
+    }
+
+    @Override
+    public void loadCache() throws IOException {
+        smallState.loadCache();
+        largeState.loadCache();
+    }
+
+    @Override
+    public void clearCache() {
+        smallState.clearCache();
+        largeState.clearCache();
+    }
+
+    private <T> T execute(
+            FunctionWithException<SequencedMultiSetState<RowData>, T, 
Exception> stateOp,
+            Function<T, Long> getNewSize,
+            String action)
+            throws Exception {
+
+        final boolean isUsingLarge = isIsUsingLargeState();
+
+        // start with small state, i.e. choose smallState when both are empty
+        SequencedMultiSetState<RowData> currentState = isUsingLarge ? 
largeState : smallState;
+        SequencedMultiSetState<RowData> otherState = isUsingLarge ? smallState 
: largeState;
+
+        T result = stateOp.apply(currentState);
+        final long sizeAfter = getNewSize.apply(result);
+
+        final boolean thresholdReached =
+                isUsingLarge
+                        ? sizeAfter <= switchToSmallThreshold
+                        : sizeAfter >= switchToLargeThreshold;
+
+        if (thresholdReached) {
+            LOG.debug(
+                    "Switch {} -> {} because '{}' resulted in state size 
reaching {} elements",
+                    currentState.getClass().getSimpleName(),
+                    otherState.getClass().getSimpleName(),
+                    action,
+                    sizeAfter);
+            switchState(currentState, otherState);
+        }
+
+        clearCache();
+        return result;
+    }
+
+    @VisibleForTesting
+    boolean isIsUsingLargeState() throws IOException {
+        smallState.loadCache();
+        if (!smallState.isEmpty()) {
+            return false;
+        }
+        largeState.loadCache();
+        return !largeState.isEmpty();
+    }
+
+    private void switchState(
+            SequencedMultiSetState<RowData> src, 
SequencedMultiSetState<RowData> dst)
+            throws Exception {
+        Iterator<Tuple2<RowData, Long>> it = src.iterator();
+        while (it.hasNext()) {
+            Tuple2<RowData, Long> next = it.next();
+            dst.append(next.f0, next.f1);
+        }
+        src.clear();
+    }
+
+    public static AdaptiveSequencedMultiSetState create(
+            SequencedMultiSetStateConfig sequencedMultiSetStateConfig,
+            String backendTypeIdentifier,
+            ValueStateMultiSetState smallState,
+            LinkedMultiSetState largeState) {
+        return new AdaptiveSequencedMultiSetState(
+                smallState,
+                largeState,
+                sequencedMultiSetStateConfig
+                        .getAdaptiveHighThresholdOverride()
+                        .orElse(
+                                isHeap(backendTypeIdentifier)
+                                        ? ADAPTIVE_HEAP_HIGH_THRESHOLD
+                                        : ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
+                sequencedMultiSetStateConfig
+                        .getAdaptiveLowThresholdOverride()
+                        .orElse(
+                                isHeap(backendTypeIdentifier)
+                                        ? ADAPTIVE_HEAP_LOW_THRESHOLD
+                                        : ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
+    }
+
+    private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
+    private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
+    private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
+    private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;
+
+    private static boolean isHeap(String stateBackend) {
+        String trim = stateBackend.trim();
+        return trim.equalsIgnoreCase("hashmap") || 
trim.equalsIgnoreCase("heap");
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java
new file mode 100644
index 00000000000..5f3140b6d24
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents an interface for managing an ordered multi-set state 
in Apache Flink. It
+ * provides methods to add, append, and remove elements while maintaining 
insertion order.
+ *
+ * <p>The state supports two types of semantics for adding elements:
+ *
+ * <ul>
+ *   <li><b>Normal Set Semantics:</b> Replaces an existing matching element 
with the new one.
+ *   <li><b>Multi-Set Semantics:</b> Appends the new element, allowing 
duplicates.
+ * </ul>
+ *
+ * <p>Removal operations are supported with different result types, indicating 
the outcome of the
+ * removal process, such as whether all elements were removed, the last added 
element was removed,
+ * or no elements were removed.
+ *
+ * @param <T> The type of elements stored in the state.
+ */
+@Internal
+public interface SequencedMultiSetState<T> {
+
+    /**
+     * Add the given element using the normal (non-multi) set semantics: if a 
matching element
+     * exists already, replace it (the timestamp is updated).
+     */
+    StateChangeInfo<T> add(T element, long timestamp) throws Exception;
+
+    /** Add the given element using the multi-set semantics, i.e. append. */
+    StateChangeInfo<T> append(T element, long timestamp) throws Exception;
+
+    /**
+     * Remove the given element. If there are multiple instances of the same 
element, remove the
+     * first one in insertion order.
+     */
+    StateChangeInfo<T> remove(T element) throws Exception;
+
+    /** Represents a result of a state changing operation. */
+    class StateChangeInfo<T> {
+        private final StateChangeType changeType;
+        private final long sizeBefore;
+        private final long sizeAfter;
+        @Nullable private final T payload; // depends on the change type
+
+        public static <T> StateChangeInfo<T> forAddition(long sizeBefore, long 
sizeAfter) {
+            return new StateChangeInfo<>(sizeBefore, sizeAfter, 
StateChangeType.ADDITION, null);
+        }
+
+        public static <T> StateChangeInfo<T> forRemovedLastAdded(
+                long sizeBefore, long sizeAfter, T payload) {
+            return new StateChangeInfo<>(
+                    sizeBefore, sizeAfter, StateChangeType.REMOVAL_LAST_ADDED, 
payload);
+        }
+
+        public static <T> StateChangeInfo<T> forRemovedOther(long sizeBefore, 
long sizeAfter) {
+            return new StateChangeInfo<>(
+                    sizeBefore, sizeAfter, StateChangeType.REMOVAL_OTHER, 
null);
+        }
+
+        public static <T> StateChangeInfo<T> forAllRemoved(
+                long sizeBefore, long sizeAfter, T payload) {
+            return new StateChangeInfo<>(
+                    sizeBefore, sizeAfter, StateChangeType.REMOVAL_ALL, 
payload);
+        }
+
+        public static <T> StateChangeInfo<T> forRemovalNotFound(long size) {
+            return new StateChangeInfo<>(size, size, 
StateChangeType.REMOVAL_NOT_FOUND, null);
+        }
+
+        private StateChangeInfo(
+                long sizeBefore, long sizeAfter, StateChangeType changeType, 
@Nullable T payload) {
+            changeType.validate(sizeBefore, sizeAfter, payload);
+            this.sizeBefore = sizeBefore;
+            this.sizeAfter = sizeAfter;
+            this.changeType = changeType;
+            this.payload = payload;
+        }
+
+        public long getSizeAfter() {
+            return sizeAfter;
+        }
+
+        public boolean wasEmpty() {
+            return sizeBefore == 0;
+        }
+
+        public StateChangeType getChangeType() {
+            return changeType;
+        }
+
+        /** The payload depends on the {@link StateChangeType}. */
+        public Optional<T> getPayload() {
+            return Optional.ofNullable(payload);
+        }
+    }
+
+    /** Get iterator over all remaining elements and their timestamps, in 
order of insertion. */
+    Iterator<Tuple2<T, Long>> iterator() throws Exception;
+
+    /** Tells whether any state exists (in the given key context). */
+    boolean isEmpty() throws IOException;
+
+    /** Clear the state (in the current key context). */
+    void clear();
+
+    /** Load cache. */
+    void loadCache() throws IOException;
+
+    /** Clear caches. */
+    void clearCache();
+
+    /** Removal Result Type. */
+    enum StateChangeType {
+        /**
+         * An element was added or appended to the state. The result will not 
contain any elements.
+         */
+        ADDITION {
+            @Override
+            public <T> void validate(long sizeBefore, long sizeAfter, T 
payload) {
+                checkArgument(sizeAfter == sizeBefore + 1 || sizeAfter == 
sizeBefore);
+            }
+        },
+        /**
+         * Nothing was removed (e.g. as a result of TTL or not matching key), 
the result will not
+         * contain any elements.
+         */
+        REMOVAL_NOT_FOUND {
+            @Override
+            public <T> void validate(long sizeBefore, long sizeAfter, T 
payload) {
+                checkArgument(sizeAfter == sizeBefore);
+                checkArgument(payload == null);
+            }
+        },
+        /** All elements were removed. The result will contain the last 
removed element. */
+        REMOVAL_ALL {
+            @Override
+            public <T> void validate(long sizeBefore, long sizeAfter, T 
payload) {
+                checkArgument(sizeBefore > 0);
+                checkArgument(sizeAfter == 0);
+                checkNotNull(payload);
+            }
+        },
+        /**
+         * The most recently added element was removed. The result will 
contain the element added
+         * before it.
+         */
+        REMOVAL_LAST_ADDED {
+            @Override
+            public <T> void validate(long sizeBefore, long sizeAfter, T 
payload) {
+                checkArgument(sizeAfter == sizeBefore - 1);
+                checkNotNull(payload);
+            }
+        },
+        /**
+         * An element was removed, it was not the most recently added, there 
are more elements. The
+         * result will not contain any elements
+         */
+        REMOVAL_OTHER {
+            @Override
+            public <T> void validate(long sizeBefore, long sizeAfter, T 
payload) {
+                checkArgument(sizeAfter == sizeBefore - 1);
+                checkArgument(payload == null);
+            }
+        };
+
+        public abstract <T> void validate(long sizeBefore, long sizeAfter, T 
payload);
+    }
+
+    enum Strategy {
+        VALUE_STATE,
+        MAP_STATE,
+        ADAPTIVE
+    }
+
+    static SequencedMultiSetState<RowData> create(
+            SequencedMultiSetStateContext parameters,
+            RuntimeContext ctx,
+            String backendTypeIdentifier) {
+        switch (parameters.config.getStrategy()) {
+            case MAP_STATE:
+                return LinkedMultiSetState.create(parameters, ctx);
+            case VALUE_STATE:
+                return ValueStateMultiSetState.create(parameters, ctx);
+            case ADAPTIVE:
+                return AdaptiveSequencedMultiSetState.create(
+                        parameters.config,
+                        backendTypeIdentifier,
+                        ValueStateMultiSetState.create(parameters, ctx),
+                        LinkedMultiSetState.create(parameters, ctx));
+            default:
+                throw new 
UnsupportedOperationException(parameters.config.getStrategy().name());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
new file mode 100644
index 00000000000..a6b16c343d8
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.api.TimeDomain;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Configuration for {@link SequencedMultiSetState}. */
+public class SequencedMultiSetStateConfig {
+
+    private final SequencedMultiSetState.Strategy strategy;
+    private final @Nullable Long adaptiveHighThresholdOverride;
+    private final @Nullable Long adaptiveLowThresholdOverride;
+    private final StateTtlConfig ttlConfig;
+    private final TimeSelector ttlTimeSelector;
+
+    public SequencedMultiSetStateConfig(
+            SequencedMultiSetState.Strategy strategy,
+            @Nullable Long adaptiveHighThresholdOverride,
+            @Nullable Long adaptiveLowThresholdOverride,
+            StateTtlConfig ttlConfig,
+            TimeDomain ttlTimeDomain) {
+        this(
+                strategy,
+                adaptiveHighThresholdOverride,
+                adaptiveLowThresholdOverride,
+                ttlConfig,
+                TimeSelector.getTimeDomain(ttlTimeDomain));
+    }
+
+    public SequencedMultiSetStateConfig(
+            SequencedMultiSetState.Strategy strategy,
+            @Nullable Long adaptiveHighThresholdOverride,
+            @Nullable Long adaptiveLowThresholdOverride,
+            StateTtlConfig ttlConfig,
+            TimeSelector ttlTimeSelector) {
+        checkArgument(
+                !ttlConfig.isEnabled(),
+                "TTL is not supported"); // 
https://issues.apache.org/jira/browse/FLINK-38463
+        this.strategy = strategy;
+        this.adaptiveHighThresholdOverride = adaptiveHighThresholdOverride;
+        this.adaptiveLowThresholdOverride = adaptiveLowThresholdOverride;
+        this.ttlConfig = ttlConfig;
+        this.ttlTimeSelector = ttlTimeSelector;
+    }
+
+    public static SequencedMultiSetStateConfig defaults(
+            TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) {
+        return forValue(ttlTimeDomain, ttlConfig);
+    }
+
+    public static SequencedMultiSetStateConfig forMap(
+            TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) {
+        return new SequencedMultiSetStateConfig(
+                SequencedMultiSetState.Strategy.MAP_STATE, null, null, 
ttlConfig, ttlTimeDomain);
+    }
+
+    public static SequencedMultiSetStateConfig forValue(
+            TimeDomain ttlTimeDomain, StateTtlConfig ttl) {
+        return new SequencedMultiSetStateConfig(
+                SequencedMultiSetState.Strategy.VALUE_STATE, null, null, ttl, 
ttlTimeDomain);
+    }
+
+    public static SequencedMultiSetStateConfig adaptive(
+            TimeDomain ttlTimeDomain,
+            long adaptiveHighThresholdOverride,
+            long adaptiveLowThresholdOverride,
+            StateTtlConfig ttl) {
+        return new SequencedMultiSetStateConfig(
+                SequencedMultiSetState.Strategy.ADAPTIVE,
+                adaptiveHighThresholdOverride,
+                adaptiveLowThresholdOverride,
+                ttl,
+                ttlTimeDomain);
+    }
+
+    public TimeSelector getTimeSelector() {
+        return ttlTimeSelector;
+    }
+
+    public SequencedMultiSetState.Strategy getStrategy() {
+        return strategy;
+    }
+
+    public Optional<Long> getAdaptiveHighThresholdOverride() {
+        return Optional.ofNullable(adaptiveHighThresholdOverride);
+    }
+
+    public Optional<Long> getAdaptiveLowThresholdOverride() {
+        return Optional.ofNullable(adaptiveLowThresholdOverride);
+    }
+
+    public StateTtlConfig getTtlConfig() {
+        return ttlConfig;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
new file mode 100644
index 00000000000..b7412d2abcd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+
+import java.util.function.Function;
+
+/** {@link SequencedMultiSetState} (creation) context. */
+public class SequencedMultiSetStateContext {
+
+    public final SequencedMultiSetStateConfig config;
+    public final TypeSerializer<RowData> keySerializer;
+    public final GeneratedRecordEqualiser generatedKeyEqualiser;
+    public final GeneratedHashFunction generatedKeyHashFunction;
+    public final TypeSerializer<RowData> recordSerializer;
+    public final Function<RowData, RowData> keyExtractor;
+
+    public SequencedMultiSetStateContext(
+            TypeSerializer<RowData> keySerializer,
+            GeneratedRecordEqualiser generatedKeyEqualiser,
+            GeneratedHashFunction generatedKeyHashFunction,
+            TypeSerializer<RowData> recordSerializer,
+            Function<RowData, RowData> keyExtractor,
+            SequencedMultiSetStateConfig config) {
+        this.keySerializer = keySerializer;
+        this.generatedKeyEqualiser = generatedKeyEqualiser;
+        this.generatedKeyHashFunction = generatedKeyHashFunction;
+        this.recordSerializer = recordSerializer;
+        this.keyExtractor = keyExtractor;
+        this.config = config;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
new file mode 100644
index 00000000000..40fe6f456e6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.util.clock.SystemClock;
+
+@Internal
+@FunctionalInterface
+public interface TimeSelector {
+
+    long getTimestamp(long elementTimestamp);
+
+    static TimeSelector getTimeDomain(TimeDomain timeDomain) {
+        switch (timeDomain) {
+            case EVENT_TIME:
+                return elementTimestamp -> elementTimestamp;
+            case PROCESSING_TIME:
+                return elementTimestamp -> 
SystemClock.getInstance().absoluteTimeMillis();
+            default:
+                throw new IllegalStateException("unknown time domain: " + 
timeDomain);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java
new file mode 100644
index 00000000000..3bc88dc67e3
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Simple implementation of {@link SequencedMultiSetState} based on plain 
{@code ValueState<List>}.
+ */
+class ValueStateMultiSetState implements SequencedMultiSetState<RowData> {
+
+    private final ValueState<List<Tuple2<RowData, Long>>> valuesState;
+    private final RecordEqualiser keyEqualiser;
+    private final Function<RowData, RowData> keyExtractor;
+    private final TimeSelector timeSelector;
+    private List<Tuple2<RowData, Long>> cache;
+
+    ValueStateMultiSetState(
+            ValueState<List<Tuple2<RowData, Long>>> valuesState,
+            RecordEqualiser keyEqualiser,
+            Function<RowData, RowData> keyExtractor,
+            TimeSelector timeSelector) {
+        this.valuesState = valuesState;
+        this.keyEqualiser = keyEqualiser;
+        this.keyExtractor = keyExtractor;
+        this.timeSelector = timeSelector;
+    }
+
+    public static ValueStateMultiSetState create(
+            SequencedMultiSetStateContext p, RuntimeContext ctx) {
+        //noinspection rawtypes,unchecked
+        return new ValueStateMultiSetState(
+                ctx.getState(
+                        new ValueStateDescriptor<>(
+                                "list",
+                                new ListSerializer<>(
+                                        new TupleSerializer(
+                                                Tuple2.class,
+                                                new TypeSerializer[] {
+                                                    p.recordSerializer, 
LongSerializer.INSTANCE
+                                                })))),
+                
p.generatedKeyEqualiser.newInstance(ctx.getUserCodeClassLoader()),
+                p.keyExtractor,
+                p.config.getTimeSelector());
+    }
+
+    @Override
+    public StateChangeInfo<RowData> add(RowData row, long ts) throws Exception 
{
+        normalizeRowKind(row);
+        final Tuple2<RowData, Long> toAdd = Tuple2.of(row, 
timeSelector.getTimestamp(ts));
+        final RowData key = asKey(row);
+        final List<Tuple2<RowData, Long>> list = maybeReadState();
+        final long oldSize = list.size();
+
+        int idx = Integer.MIN_VALUE;
+        int i = 0;
+        for (Tuple2<RowData, Long> t : list) {
+            if (keyEqualiser.equals(asKey(t.f0), key)) {
+                idx = i;
+                break;
+            }
+            i++;
+        }
+        if (idx < 0) {
+            list.add(toAdd);
+        } else {
+            list.set(idx, toAdd);
+        }
+        valuesState.update(list);
+        return StateChangeInfo.forAddition(oldSize, list.size());
+    }
+
+    @Override
+    public StateChangeInfo<RowData> append(RowData row, long timestamp) throws 
Exception {
+        normalizeRowKind(row);
+        List<Tuple2<RowData, Long>> values = maybeReadState();
+        final long oldSize = values.size();
+        values.add(Tuple2.of(row, timeSelector.getTimestamp(timestamp)));
+        valuesState.update(values);
+        return StateChangeInfo.forAddition(oldSize, values.size());
+    }
+
+    @Override
+    public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
+        return maybeReadState().iterator();
+    }
+
+    @Override
+    public StateChangeInfo<RowData> remove(RowData row) throws Exception {
+        normalizeRowKind(row);
+        final RowData key = asKey(row);
+        final List<Tuple2<RowData, Long>> list = maybeReadState();
+        final int oldSize = list.size();
+
+        int dropIdx = Integer.MIN_VALUE;
+        RowData last = null;
+        int i = 0;
+        for (Tuple2<RowData, Long> t : list) {
+            if (keyEqualiser.equals(key, asKey(t.f0))) {
+                dropIdx = i;
+                break;
+            } else {
+                last = t.f0;
+            }
+            i++;
+        }
+        final RowData removed;
+        if (dropIdx >= 0) {
+            list.remove(dropIdx);
+            removed = row;
+            valuesState.update(list);
+        } else {
+            removed = null;
+        }
+        return toRemovalResult(oldSize, list.size(), dropIdx, removed, last);
+    }
+
+    @Override
+    public void loadCache() throws IOException {
+        cache = readState();
+    }
+
+    @Override
+    public void clearCache() {
+        cache = null;
+    }
+
+    private List<Tuple2<RowData, Long>> maybeReadState() throws IOException {
+        if (cache != null) {
+            return cache;
+        }
+        return readState();
+    }
+
+    private List<Tuple2<RowData, Long>> readState() throws IOException {
+        List<Tuple2<RowData, Long>> value = valuesState.value();
+        if (value == null) {
+            value = new ArrayList<>();
+        }
+        return value;
+    }
+
+    @Override
+    public void clear() {
+        clearCache();
+        valuesState.clear();
+    }
+
+    @Override
+    public boolean isEmpty() throws IOException {
+        List<Tuple2<RowData, Long>> list = cache == null ? valuesState.value() 
: cache;
+        return list == null || list.isEmpty();
+    }
+
+    private RowData asKey(RowData row) {
+        return keyExtractor.apply(row);
+    }
+
+    private static void normalizeRowKind(RowData row) {
+        row.setRowKind(RowKind.INSERT);
+    }
+
+    private static StateChangeInfo<RowData> toRemovalResult(
+            long oldSize, long newSize, int dropIdx, RowData removed, RowData 
last) {
+        if (dropIdx < 0) {
+            return StateChangeInfo.forRemovalNotFound(oldSize);
+        } else if (newSize == 0) {
+            return StateChangeInfo.forAllRemoved(oldSize, newSize, removed);
+        } else if (dropIdx == oldSize - 1) {
+            return StateChangeInfo.forRemovedLastAdded(oldSize, newSize, last);
+        } else {
+            return StateChangeInfo.forRemovedOther(oldSize, newSize);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
new file mode 100644
index 00000000000..840fe007ebf
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
+import org.apache.flink.table.runtime.sequencedmultisetstate.TimeSelector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class implements an ordered multi-set state backend using Flink's 
state primitives. It
+ * maintains the insertion order of elements and supports operations such as 
adding, appending, and
+ * removing elements. The state is backed by Flink's `MapState` and 
`ValueState` to store and manage
+ * the relationships between rows and sequence numbers (SQNs).
+ *
+ * <p>Key features of this state implementation:
+ *
+ * <ul>
+ *   <li>Maintains insertion order of elements using a doubly-linked list 
structure.
+ *   <li>Supports both normal set semantics (replacing existing elements) and 
multi-set semantics
+ *       (allowing duplicates).
+ *   <li>Efficiently tracks the highest sequence number and links between 
elements for fast
+ *       traversal and updates.
+ *   <li>Provides methods to add, append, and remove elements with appropriate 
handling of state
+ *       transitions.
+ * </ul>
+ *
+ * <p>Note: This implementation is marked as {@code @Internal} and is intended 
for internal use
+ * within Flink. It may be subject to changes in future versions.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li>Use the {@link #add(RowData, long)} method to add an element, 
replacing any existing
+ *       matching element.
+ *   <li>Use the {@link #append(RowData, long)} method to append an element, 
allowing duplicates.
+ *   <li>Use the {@link #remove(RowData)} method to remove an element, with 
detailed removal result
+ *       types.
+ * </ul>
+ *
+ * @see SequencedMultiSetState
+ * @see org.apache.flink.api.common.state.MapState
+ * @see org.apache.flink.api.common.state.ValueState
+ */
+@Internal
+public class LinkedMultiSetState implements SequencedMultiSetState<RowData> {
+
+    // maps rows to SQNs (first and last SQN for a row (same in case of upsert 
key))
+    private final MapState<RowDataKey, RowSqnInfo> rowToSqnState;
+    // maps SQNs to Nodes, which comprise a doubly-linked list
+    private final MapState<Long, Node> sqnToNodeState;
+    // highest sequence number; also latest emitted downstream
+    private final ValueState<MetaSqnInfo> highestSqnAndSizeState;
+
+    private final RecordEqualiser keyEqualiser;
+    private final HashFunction keyHashFunction;
+    private final Function<RowData, RowData> keyExtractor;
+    private final TimeSelector timeSelector;
+
+    private LinkedMultiSetState(
+            MapState<RowDataKey, RowSqnInfo> rowToSqnState,
+            MapState<Long, Node> sqnToNodeState,
+            ValueState<MetaSqnInfo> highestSqnAndSizeState,
+            RecordEqualiser keyEqualiser,
+            HashFunction keyHashFunction,
+            Function<RowData, RowData> keyExtractor,
+            TimeSelector timeSelector) {
+        this.rowToSqnState = checkNotNull(rowToSqnState);
+        this.sqnToNodeState = checkNotNull(sqnToNodeState);
+        this.highestSqnAndSizeState = checkNotNull(highestSqnAndSizeState);
+        this.keyEqualiser = checkNotNull(keyEqualiser);
+        this.keyHashFunction = checkNotNull(keyHashFunction);
+        this.keyExtractor = keyExtractor;
+        this.timeSelector = timeSelector;
+    }
+
+    public static LinkedMultiSetState create(SequencedMultiSetStateContext p, 
RuntimeContext ctx) {
+
+        RecordEqualiser keyEqualiser =
+                
p.generatedKeyEqualiser.newInstance(ctx.getUserCodeClassLoader());
+        HashFunction keyHashFunction =
+                
p.generatedKeyHashFunction.newInstance(ctx.getUserCodeClassLoader());
+
+        MapState<RowDataKey, RowSqnInfo> rowToSqnState =
+                ctx.getMapState(
+                        new MapStateDescriptor<>(
+                                "rowToSqnState",
+                                new RowDataKeySerializer(
+                                        p.keySerializer,
+                                        keyEqualiser,
+                                        keyHashFunction,
+                                        p.generatedKeyEqualiser,
+                                        p.generatedKeyHashFunction),
+                                new RowSqnInfoSerializer()));
+        MapState<Long, Node> sqnToNodeState =
+                ctx.getMapState(
+                        new MapStateDescriptor<>(
+                                "sqnToNodeState",
+                                LongSerializer.INSTANCE,
+                                new NodeSerializer(p.recordSerializer)));
+
+        ValueState<MetaSqnInfo> highestSqnState =
+                ctx.getState(
+                        new ValueStateDescriptor<>("highestSqnState", new 
MetaSqnInfoSerializer()));
+        return new LinkedMultiSetState(
+                rowToSqnState,
+                sqnToNodeState,
+                highestSqnState,
+                keyEqualiser,
+                keyHashFunction,
+                p.keyExtractor,
+                p.config.getTimeSelector());
+    }
+
+    @Override
+    public StateChangeInfo<RowData> add(RowData row, long timestamp) throws 
Exception {
+        final RowDataKey key = toKey(row);
+        final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value();
+        final Long highSqn = highSqnAndSize == null ? null : 
highSqnAndSize.highSqn;
+        final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size;
+        final RowSqnInfo rowSqnInfo = rowToSqnState.get(key);
+        final Long rowSqn = rowSqnInfo == null ? null : 
rowToSqnState.get(key).firstSqn;
+        final boolean isNewRowKey = rowSqn == null; // it's a 1st such record 
'row'
+        final boolean isNewContextKey = highSqn == null; // 1st a record for 
current context key
+
+        final Long oldSqn;
+        final long newSqn;
+        final long newSize;
+
+        if (isNewContextKey && isNewRowKey) {
+            // no state at all for this context key
+            oldSqn = null;
+            newSqn = 0;
+            newSize = 1;
+        } else if (isNewRowKey) {
+            // add new rowKey "to the end"
+            oldSqn = null;
+            newSqn = highSqn + 1;
+            newSize = oldSize + 1;
+        } else {
+            // replace an existing row by rowKey
+            oldSqn = newSqn = rowSqn;
+            newSize = oldSize;
+        }
+
+        timestamp = timeSelector.getTimestamp(timestamp);
+
+        sqnToNodeState.put(
+                newSqn,
+                isNewRowKey
+                        ? new Node(row, newSqn, highSqn, null, null, timestamp)
+                        : sqnToNodeState.get(oldSqn).withRow(row, timestamp));
+        highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
+        if (isNewRowKey) {
+            rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn));
+            if (!isNewContextKey) {
+                sqnToNodeState.put(highSqn, 
sqnToNodeState.get(highSqn).withNext(newSqn));
+            }
+        }
+        return StateChangeInfo.forAddition(oldSize, newSize);
+    }
+
+    @Override
+    public StateChangeInfo<RowData> append(RowData row, long timestamp) throws 
Exception {
+        final RowDataKey key = toKey(row);
+        final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value();
+        final Long highSqn = highSqnAndSize == null ? null : 
highSqnAndSize.highSqn;
+        final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size;
+        final boolean existed = highSqn != null;
+        final long newSqn = (existed ? highSqn + 1 : 0);
+        final Node newNode =
+                new Node(
+                        row,
+                        newSqn,
+                        highSqn, /*next*/
+                        null, /*nextForRecord*/
+                        null,
+                        timeSelector.getTimestamp(timestamp));
+        final long newSize = oldSize + 1;
+
+        final RowSqnInfo sqnInfo = existed ? rowToSqnState.get(key) : null;
+        final Long rowSqn = sqnInfo == null ? null : sqnInfo.firstSqn;
+        if (rowSqn == null) {
+            rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn));
+        } else {
+            rowToSqnState.put(key, RowSqnInfo.of(rowSqn, newSqn));
+            sqnToNodeState.put(
+                    sqnInfo.lastSqn, 
sqnToNodeState.get(sqnInfo.lastSqn).withNextForRecord(newSqn));
+        }
+        highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
+        sqnToNodeState.put(newSqn, newNode);
+        if (existed) {
+            sqnToNodeState.put(highSqn, 
sqnToNodeState.get(highSqn).withNext(newSqn));
+        }
+        return StateChangeInfo.forAddition(oldSize, newSize);
+    }
+
+    public StateChangeInfo<RowData> remove(RowData row) throws Exception {
+        final RowDataKey key = toKey(row);
+        final RowSqnInfo sqnInfo = rowToSqnState.get(key);
+        final Long rowSqn = sqnInfo == null ? null : sqnInfo.firstSqn;
+        final MetaSqnInfo highSqnStateAndSize = highestSqnAndSizeState.value();
+        final long oldSize = highSqnStateAndSize == null ? 0L : 
highSqnStateAndSize.size;
+        if (rowSqn == null) {
+            return StateChangeInfo.forRemovalNotFound(oldSize);
+        }
+        final Node node = sqnToNodeState.get(rowSqn);
+
+        final Node prev = removeNode(node, key, highSqnStateAndSize);
+
+        if (!node.isHighestSqn()) {
+            return StateChangeInfo.forRemovedOther(oldSize, oldSize - 1);
+        } else if (prev == null) {
+            return StateChangeInfo.forAllRemoved(oldSize, oldSize - 1, row);
+        } else {
+            return StateChangeInfo.forRemovedLastAdded(oldSize, oldSize - 1, 
prev.row);
+        }
+    }
+
+    @Override
+    public void clear() {
+        clearCache();
+        sqnToNodeState.clear();
+        highestSqnAndSizeState.clear();
+        rowToSqnState.clear();
+    }
+
+    @Override
+    public void loadCache() {}
+
+    @Override
+    public void clearCache() {}
+
+    private Node removeNode(Node node, RowDataKey key, MetaSqnInfo 
highSqnStateAndSize)
+            throws Exception {
+
+        if (node.isLowestSqn() && node.isHighestSqn()) {
+            // fast track: if last record for PK then cleanup everything and 
return
+            clear();
+            return null;
+        }
+
+        sqnToNodeState.remove(node.getSqn());
+        highestSqnAndSizeState.update(
+                MetaSqnInfo.of(
+                        node.isHighestSqn() ? node.prevSqn : 
highSqnStateAndSize.highSqn,
+                        highSqnStateAndSize.size - 1));
+        if (node.isLastForRecord()) {
+            rowToSqnState.remove(key);
+        } else {
+            rowToSqnState.put(key, 
rowToSqnState.get(key).withFirstSqn(node.nextSqnForRecord));
+        }
+        // link prev node to next
+        Node prev = null;
+        if (node.hasPrev()) {
+            prev = sqnToNodeState.get(node.prevSqn).withNext(node.nextSqn);
+            sqnToNodeState.put(node.prevSqn, prev);
+        }
+        // link next node to prev
+        if (node.hasNext()) {
+            sqnToNodeState.put(
+                    node.nextSqn, 
sqnToNodeState.get(node.nextSqn).withPrev(node.prevSqn));
+        }
+        return prev;
+    }
+
+    @Override
+    public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
+        // this can be implemented more efficiently
+        // however, the expected use case is to migrate all the values either 
to or from the memory
+        // state backend, so loading all into memory seems fine
+        List<Node> list = new ArrayList<>();
+        for (Node node : sqnToNodeState.values()) {
+            list.add(node);
+        }
+        list.sort(Comparator.comparingLong(Node::getSqn));
+        return list.stream().map(node -> Tuple2.of(node.row, 
node.timestamp)).iterator();
+    }
+
+    @Override
+    public boolean isEmpty() throws IOException {
+        return highestSqnAndSizeState.value() == null;
+    }
+
+    private RowDataKey toKey(RowData row0) {
+        return RowDataKey.toKey(keyExtractor.apply(row0), keyEqualiser, 
keyHashFunction);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java
new file mode 100644
index 00000000000..a7354872ece
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/** Stores first and last SQN for a record. */
+class MetaSqnInfo {
+    public final long highSqn;
+    public final long size;
+
+    public MetaSqnInfo(long highSqn, long size) {
+        Preconditions.checkArgument(size >= 0);
+        this.highSqn = highSqn;
+        this.size = size;
+    }
+
+    public static MetaSqnInfo of(long first, long last) {
+        return new MetaSqnInfo(first, last);
+    }
+
+    @Override
+    public String toString() {
+        return "MetaSqnInfo{" + "firstSqn=" + highSqn + ", lastSqn=" + size + 
'}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MetaSqnInfo)) {
+            return false;
+        }
+        MetaSqnInfo that = (MetaSqnInfo) o;
+        return highSqn == that.highSqn && size == that.size;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(highSqn, size);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java
new file mode 100644
index 00000000000..4819d6d4dfd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+
+@SuppressWarnings("ClassEscapesDefinedScope")
+public class MetaSqnInfoSerializer extends CompositeSerializer<MetaSqnInfo> {
+
+    public MetaSqnInfoSerializer() {
+        this(null, LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+    }
+
+    protected MetaSqnInfoSerializer(
+            PrecomputedParameters precomputed, TypeSerializer<?>... 
fieldSerializers) {
+        super(
+                PrecomputedParameters.precompute(
+                        true, true, (TypeSerializer<Object>[]) 
fieldSerializers),
+                fieldSerializers);
+    }
+
+    @Override
+    public MetaSqnInfo createInstance(Object... values) {
+        return new MetaSqnInfo((Long) values[0], (Long) values[1]);
+    }
+
+    @Override
+    protected void setField(MetaSqnInfo sqnInfo, int index, Object fieldValue) 
{
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected Object getField(MetaSqnInfo value, int index) {
+        switch (index) {
+            case 0:
+                return value.highSqn;
+            case 1:
+                return value.size;
+            default:
+                throw new IllegalArgumentException("invalid index: " + index);
+        }
+    }
+
+    @Override
+    protected CompositeSerializer<MetaSqnInfo> createSerializerInstance(
+            PrecomputedParameters precomputed, TypeSerializer<?>... 
originalSerializers) {
+        return new MetaSqnInfoSerializer(precomputed, originalSerializers);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<MetaSqnInfo> snapshotConfiguration() {
+        return new MetaSqnInfoSerializerSnapshot(this);
+    }
+
+    public static class MetaSqnInfoSerializerSnapshot
+            extends CompositeTypeSerializerSnapshot<MetaSqnInfo, 
MetaSqnInfoSerializer> {
+
+        @SuppressWarnings("unused")
+        public MetaSqnInfoSerializerSnapshot() {}
+
+        MetaSqnInfoSerializerSnapshot(MetaSqnInfoSerializer serializer) {
+            super(serializer);
+        }
+
+        @Override
+        protected int getCurrentOuterSnapshotVersion() {
+            return 0;
+        }
+
+        @Override
+        protected TypeSerializer<?>[] 
getNestedSerializers(MetaSqnInfoSerializer outerSerializer) {
+            return new TypeSerializer[] {LongSerializer.INSTANCE, 
LongSerializer.INSTANCE};
+        }
+
+        @Override
+        protected MetaSqnInfoSerializer 
createOuterSerializerWithNestedSerializers(
+                TypeSerializer<?>[] nestedSerializers) {
+            return new MetaSqnInfoSerializer(null, nestedSerializers);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java
new file mode 100644
index 00000000000..232437ab310
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.Objects;
+
+class Node {
+    final RowData row;
+    private final long sqn;
+    final Long prevSqn;
+    final Long nextSqn;
+    final Long nextSqnForRecord;
+    final Long timestamp; // for future TTL support
+
+    Node(RowData row, long sqn, Long prevSqn, Long nextSqn, Long 
nextSqnForRecord, Long timestamp) {
+        this.row = row;
+        this.sqn = sqn;
+        this.prevSqn = prevSqn;
+        this.nextSqn = nextSqn;
+        this.nextSqnForRecord = nextSqnForRecord;
+        this.timestamp = timestamp;
+    }
+
+    public boolean isLastForRecord() {
+        return nextSqnForRecord == null;
+    }
+
+    public boolean isLowestSqn() {
+        return !hasPrev();
+    }
+
+    public boolean isHighestSqn() {
+        return !hasNext();
+    }
+
+    public boolean hasPrev() {
+        return prevSqn != null;
+    }
+
+    public boolean hasNext() {
+        return nextSqn != null;
+    }
+
+    public Node withNextForRecord(Long nextSeqNoForRecord) {
+        return new Node(row, sqn, prevSqn, nextSqn, nextSeqNoForRecord, 
timestamp);
+    }
+
+    public Node withNext(Long nextSeqNo) {
+        return new Node(row, sqn, prevSqn, nextSeqNo, nextSqnForRecord, 
timestamp);
+    }
+
+    public Node withPrev(Long prevSeqNo) {
+        return new Node(row, sqn, prevSeqNo, nextSqn, nextSqnForRecord, 
timestamp);
+    }
+
+    public Node withRow(RowData row, long timestamp) {
+        return new Node(row, sqn, prevSqn, nextSqn, nextSqnForRecord, 
timestamp);
+    }
+
+    public RowData getRow() {
+        return row;
+    }
+
+    public long getSqn() {
+        return sqn;
+    }
+
+    public Long getPrevSqn() {
+        return prevSqn;
+    }
+
+    public Long getNextSqn() {
+        return nextSqn;
+    }
+
+    public Long getNextSqnForRecord() {
+        return nextSqnForRecord;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Node)) {
+            return false;
+        }
+        Node node = (Node) o;
+        // do not compare row data since:
+        // 1. the type might be different after deserialization, e.g. 
GenericRowData vs
+        // BinaryRowData
+        // 2. proper comparison requires (generated) equalizer
+        // 3. equals is only used in tests (as opposed to RowDataKey)
+        return sqn == node.sqn
+                && Objects.equals(prevSqn, node.prevSqn)
+                && Objects.equals(nextSqn, node.nextSqn)
+                && Objects.equals(nextSqnForRecord, node.nextSqnForRecord);
+    }
+
+    @Override
+    public int hashCode() {
+        // rowData is ignored - see equals
+        return Objects.hash(sqn, prevSqn, nextSqn, nextSqnForRecord);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java
new file mode 100644
index 00000000000..aa6577e4fed
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.table.data.RowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link TypeSerializer} for {@link Node}. */
+@SuppressWarnings("NullableProblems")
+public class NodeSerializer extends CompositeSerializer<Node> {
+
+    private static final LongSerializer LONG_SERIALIZER = 
LongSerializer.INSTANCE;
+    private static final TypeSerializer<?> NULLABLE_LONG_SERIALIZER =
+            NullableSerializer.wrap(LONG_SERIALIZER, true);
+
+    public NodeSerializer(TypeSerializer<RowData> serializer) {
+        this(null, NodeField.getFieldSerializers(serializer));
+    }
+
+    protected NodeSerializer(
+            PrecomputedParameters precomputed, TypeSerializer<?>[] 
originalSerializers) {
+        //noinspection unchecked
+        super(
+                PrecomputedParameters.precompute(
+                        true, true, (TypeSerializer<Object>[]) 
originalSerializers),
+                originalSerializers);
+    }
+
+    private NodeSerializer(TypeSerializer<?>[] nestedSerializers) {
+        this(null, nestedSerializers);
+    }
+
+    @Override
+    public Node createInstance(Object... values) {
+        return new Node(
+                NodeField.ROW.get(values),
+                NodeField.SEQ_NO.get(values),
+                NodeField.PREV_SEQ_NO.get(values),
+                NodeField.NEXT_SEQ_NO.get(values),
+                NodeField.NEXT_SEQ_NO_FOR_RECORD.get(values),
+                NodeField.TIMESTAMP.get(values));
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return true;
+    }
+
+    @Override
+    protected Object getField(Node node, int index) {
+        return NodeField.get(node, index);
+    }
+
+    @Override
+    protected CompositeSerializer<Node> createSerializerInstance(
+            PrecomputedParameters precomputed, TypeSerializer<?>... 
originalSerializers) {
+        return new NodeSerializer(precomputed, originalSerializers);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<Node> snapshotConfiguration() {
+        return new NodeSerializerSnapshot(this);
+    }
+
+    @Override
+    protected void setField(Node value, int index, Object fieldValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    private enum NodeField {
+        ROW {
+            @Override
+            Object get(Node node) {
+                return node.getRow();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return serializer;
+            }
+        },
+        SEQ_NO {
+            @Override
+            Object get(Node node) {
+                return node.getSqn();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return LONG_SERIALIZER;
+            }
+        },
+        PREV_SEQ_NO {
+            @Override
+            Object get(Node node) {
+                return node.getPrevSqn();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return NULLABLE_LONG_SERIALIZER;
+            }
+        },
+        NEXT_SEQ_NO {
+            @Override
+            Object get(Node node) {
+                return node.getNextSqn();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return NULLABLE_LONG_SERIALIZER;
+            }
+        },
+        NEXT_SEQ_NO_FOR_RECORD {
+            @Override
+            Object get(Node node) {
+                return node.getNextSqnForRecord();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return NULLABLE_LONG_SERIALIZER;
+            }
+        },
+        TIMESTAMP {
+            @Override
+            Object get(Node node) {
+                return node.getTimestamp();
+            }
+
+            @Override
+            public TypeSerializer<?> getSerializer(TypeSerializer<RowData> 
serializer) {
+                return LONG_SERIALIZER;
+            }
+        };
+
+        private static TypeSerializer<?>[] 
getFieldSerializers(TypeSerializer<RowData> serializer) {
+            List<TypeSerializer<?>> result = new ArrayList<>();
+            for (NodeField field : values()) {
+                result.add(field.getSerializer(serializer));
+            }
+            return result.toArray(new TypeSerializer[0]);
+        }
+
+        public abstract TypeSerializer<?> 
getSerializer(TypeSerializer<RowData> serializer);
+
+        abstract Object get(Node node);
+
+        <T> T get(Object... values) {
+            //noinspection unchecked
+            return (T) values[ordinal()];
+        }
+
+        public static Object get(Node node, int field) {
+            return values()[field].get(node);
+        }
+    }
+
+    /** {@link TypeSerializerSnapshot} of {@link NodeSerializerSnapshot}. */
+    public static class NodeSerializerSnapshot
+            extends CompositeTypeSerializerSnapshot<Node, NodeSerializer> {
+        @SuppressWarnings("unused")
+        public NodeSerializerSnapshot() {}
+
+        NodeSerializerSnapshot(NodeSerializer nodeSerializer) {
+            super(nodeSerializer);
+        }
+
+        @Override
+        protected int getCurrentOuterSnapshotVersion() {
+            return 0;
+        }
+
+        @Override
+        protected TypeSerializer<?>[] getNestedSerializers(NodeSerializer 
outerSerializer) {
+            return outerSerializer.fieldSerializers;
+        }
+
+        @Override
+        protected NodeSerializer createOuterSerializerWithNestedSerializers(
+                TypeSerializer<?>[] nestedSerializers) {
+            return new NodeSerializer(nestedSerializers);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java
new file mode 100644
index 00000000000..9e449a5a42c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.types.RowKind;
+
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class wraps keys of type {@link RowData} for the following purposes:
+ *
+ * <ol>
+ *   <li>Fix the {@link RowKind} to be the same in all keys.
+ *   <li>Project the fields in case of upsert key.
+ *   <li>Fix {@link Object#equals(Object)} and hashCode for heap state backend.
+ *   <li>Potentially fix mutability for heap state backend (by copying using 
serializer)
+ * </ol>
+ */
+@Internal
+class RowDataKey {
+    private final RecordEqualiser equaliser;
+    private final HashFunction hashFunction;
+    final RowData rowData;
+
+    RowDataKey(RecordEqualiser equaliser, HashFunction hashFunction) {
+        this.equaliser = checkNotNull(equaliser);
+        this.hashFunction = checkNotNull(hashFunction);
+        this.rowData = null;
+    }
+
+    public RowDataKey(RowData rowData, RecordEqualiser equaliser, HashFunction 
hashFunction) {
+        this.equaliser = checkNotNull(equaliser);
+        this.hashFunction = checkNotNull(hashFunction);
+        this.rowData = checkNotNull(rowData);
+    }
+
+    public static RowDataKey toKeyNotProjected(
+            RowData row, RecordEqualiser equaliser, HashFunction hasher) {
+        return toKey(row, equaliser, hasher);
+    }
+
+    public static RowDataKey toKey(RowData row, RecordEqualiser equaliser, 
HashFunction hasher) {
+        row.setRowKind(INSERT);
+        return new RowDataKey(row, equaliser, hasher);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof RowDataKey)) {
+            return false;
+        }
+        RowDataKey other = (RowDataKey) o;
+        return equaliser.equals(rowData, other.rowData);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashFunction.hashCode(rowData);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java
new file mode 100644
index 00000000000..9db229160cc
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link TypeSerializer} for {@link RowDataKey}. */
+@Internal
+public class RowDataKeySerializer extends TypeSerializer<RowDataKey> {
+    final TypeSerializer<RowData> serializer;
+    final GeneratedRecordEqualiser equaliser; // used to snapshot
+    final GeneratedHashFunction hashFunction; // used to snapshot
+    final RecordEqualiser equalizerInstance; // passed to restored keys
+    final HashFunction hashFunctionInstance; // passed to restored keys
+
+    public RowDataKeySerializer(
+            TypeSerializer<RowData> serializer,
+            RecordEqualiser equalizerInstance,
+            HashFunction hashFunctionInstance,
+            GeneratedRecordEqualiser equaliser,
+            GeneratedHashFunction hashFunction) {
+        this.serializer = checkNotNull(serializer);
+        this.equalizerInstance = checkNotNull(equalizerInstance);
+        this.hashFunctionInstance = checkNotNull(hashFunctionInstance);
+        this.equaliser = checkNotNull(equaliser);
+        this.hashFunction = checkNotNull(hashFunction);
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<RowDataKey> duplicate() {
+        return new RowDataKeySerializer(
+                serializer.duplicate(),
+                equalizerInstance,
+                hashFunctionInstance,
+                equaliser,
+                hashFunction);
+    }
+
+    @Override
+    public RowDataKey createInstance() {
+        return new RowDataKey(equalizerInstance, hashFunctionInstance);
+    }
+
+    @Override
+    public RowDataKey copy(RowDataKey from) {
+        return RowDataKey.toKeyNotProjected(
+                serializer.copy(from.rowData), equalizerInstance, 
hashFunctionInstance);
+    }
+
+    @Override
+    public RowDataKey copy(RowDataKey from, RowDataKey reuse) {
+        return copy(from);
+    }
+
+    @Override
+    public int getLength() {
+        return serializer.getLength();
+    }
+
+    @Override
+    public void serialize(RowDataKey record, DataOutputView target) throws 
IOException {
+        serializer.serialize(record.rowData, target);
+    }
+
+    @Override
+    public RowDataKey deserialize(DataInputView source) throws IOException {
+        return RowDataKey.toKeyNotProjected(
+                serializer.deserialize(source), equalizerInstance, 
hashFunctionInstance);
+    }
+
+    @Override
+    public RowDataKey deserialize(RowDataKey reuse, DataInputView source) 
throws IOException {
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serializer.copy(source, target);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof RowDataKeySerializer) {
+            RowDataKeySerializer other = (RowDataKeySerializer) obj;
+            return serializer.equals(other.serializer)
+                    && equalizerInstance.equals(other.equalizerInstance)
+                    && hashFunctionInstance.equals(other.hashFunctionInstance);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(serializer, equalizerInstance, 
hashFunctionInstance);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<RowDataKey> snapshotConfiguration() {
+        return new RowDataKeySerializerSnapshot(this);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
new file mode 100644
index 00000000000..08d9888e18b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link TypeSerializerSnapshot} of {@link RowDataKeySerializer}. */
+public class RowDataKeySerializerSnapshot implements 
TypeSerializerSnapshot<RowDataKey> {
+
+    private RowDataKeySerializer serializer;
+    private TypeSerializerSnapshot<RowData> restoredRowDataSerializerSnapshot;
+
+    @SuppressWarnings("unused")
+    public RowDataKeySerializerSnapshot() {
+        // this constructor is used when restoring from a checkpoint/savepoint.
+    }
+
+    public RowDataKeySerializerSnapshot(RowDataKeySerializer serializer) {
+        this.serializer = checkNotNull(serializer);
+    }
+
+    @Override
+    public int getCurrentVersion() {
+        return 0;
+    }
+
+    @Override
+    public void writeSnapshot(DataOutputView out) throws IOException {
+        store(serializer.equaliser, out);
+        store(serializer.hashFunction, out);
+        writeVersionedSnapshot(out, 
serializer.serializer.snapshotConfiguration());
+    }
+
+    @Override
+    public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
+            throws IOException {
+        checkArgument(readVersion == 0, "Unexpected version: " + readVersion);
+
+        GeneratedRecordEqualiser equaliser = restore(in, userCodeClassLoader);
+        GeneratedHashFunction hashFunction = restore(in, userCodeClassLoader);
+
+        restoredRowDataSerializerSnapshot =
+                TypeSerializerSnapshot.readVersionedSnapshot(in, 
userCodeClassLoader);
+
+        serializer =
+                new RowDataKeySerializer(
+                        restoredRowDataSerializerSnapshot.restoreSerializer(),
+                        equaliser.newInstance(userCodeClassLoader),
+                        hashFunction.newInstance(userCodeClassLoader),
+                        equaliser,
+                        hashFunction);
+    }
+
+    private static void store(Object object, DataOutputView out) throws 
IOException {
+        byte[] bytes = InstantiationUtil.serializeObject(object);
+        out.writeInt(bytes.length);
+        out.write(bytes);
+    }
+
+    private <T> T restore(DataInputView in, ClassLoader classLoader) throws 
IOException {
+        int len = in.readInt();
+        byte[] bytes = new byte[len];
+        in.read(bytes);
+        try {
+            return InstantiationUtil.deserializeObject(bytes, classLoader); // 
here
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public TypeSerializer<RowDataKey> restoreSerializer() {
+        return serializer;
+    }
+
+    @Override
+    public TypeSerializerSchemaCompatibility<RowDataKey> 
resolveSchemaCompatibility(
+            TypeSerializerSnapshot<RowDataKey> oldSerializerSnapshot) {
+        if (!(oldSerializerSnapshot instanceof RowDataKeySerializerSnapshot)) {
+            return TypeSerializerSchemaCompatibility.incompatible();
+        }
+
+        RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) 
oldSerializerSnapshot;
+
+        TypeSerializerSchemaCompatibility<RowData> compatibility =
+                
old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility(
+                        old.serializer.serializer.snapshotConfiguration());
+
+        return mapToOuterCompatibility(
+                compatibility,
+                serializer.equalizerInstance,
+                serializer.hashFunctionInstance,
+                serializer.equaliser,
+                serializer.hashFunction);
+    }
+
+    private static TypeSerializerSchemaCompatibility<RowDataKey> 
mapToOuterCompatibility(
+            TypeSerializerSchemaCompatibility<RowData> rowDataCmp,
+            RecordEqualiser equaliserInstance,
+            HashFunction hashFunctionInstance,
+            GeneratedRecordEqualiser equaliser,
+            GeneratedHashFunction hashFunction) {
+        if (rowDataCmp.isCompatibleAsIs()) {
+            return TypeSerializerSchemaCompatibility.compatibleAsIs();
+        } else if (rowDataCmp.isCompatibleAfterMigration()) {
+            return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+        } else if (rowDataCmp.isCompatibleWithReconfiguredSerializer()) {
+            return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+                    new RowDataKeySerializer(
+                            rowDataCmp.getReconfiguredSerializer(),
+                            equaliserInstance,
+                            hashFunctionInstance,
+                            equaliser,
+                            hashFunction));
+        } else if (rowDataCmp.isIncompatible()) {
+            return TypeSerializerSchemaCompatibility.incompatible();
+        } else {
+            throw new UnsupportedOperationException("Unknown compatibility 
mode: " + rowDataCmp);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java
new file mode 100644
index 00000000000..b8bcc438b68
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/** Stores first and las SQN for a record. */
+class RowSqnInfo {
+    public final long firstSqn;
+    public final long lastSqn;
+
+    public RowSqnInfo(long firstSqn, long lastSqn) {
+        Preconditions.checkArgument(firstSqn <= lastSqn);
+        this.firstSqn = firstSqn;
+        this.lastSqn = lastSqn;
+    }
+
+    public static RowSqnInfo ofSingle(long sqn) {
+        return of(sqn, sqn);
+    }
+
+    public static RowSqnInfo of(long first, long last) {
+        return new RowSqnInfo(first, last);
+    }
+
+    @Override
+    public String toString() {
+        return "RowSqnInfo{" + "firstSqn=" + firstSqn + ", lastSqn=" + lastSqn 
+ '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof RowSqnInfo)) {
+            return false;
+        }
+        RowSqnInfo that = (RowSqnInfo) o;
+        return firstSqn == that.firstSqn && lastSqn == that.lastSqn;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(firstSqn, lastSqn);
+    }
+
+    public RowSqnInfo withFirstSqn(long firstSqn) {
+        return RowSqnInfo.of(firstSqn, lastSqn);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java
new file mode 100644
index 00000000000..348071a22cd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+
+@SuppressWarnings("ClassEscapesDefinedScope")
+public class RowSqnInfoSerializer extends CompositeSerializer<RowSqnInfo> {
+
+    public RowSqnInfoSerializer() {
+        this(null, LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+    }
+
+    protected RowSqnInfoSerializer(
+            PrecomputedParameters precomputed, TypeSerializer<?>... 
fieldSerializers) {
+        super(
+                PrecomputedParameters.precompute(
+                        true, true, (TypeSerializer<Object>[]) 
fieldSerializers),
+                fieldSerializers);
+    }
+
+    @Override
+    public RowSqnInfo createInstance(Object... values) {
+        return new RowSqnInfo((Long) values[0], (Long) values[1]);
+    }
+
+    @Override
+    protected void setField(RowSqnInfo sqnInfo, int index, Object fieldValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected Object getField(RowSqnInfo value, int index) {
+        switch (index) {
+            case 0:
+                return value.firstSqn;
+            case 1:
+                return value.lastSqn;
+            default:
+                throw new IllegalArgumentException("invalid index: " + index);
+        }
+    }
+
+    @Override
+    protected CompositeSerializer<RowSqnInfo> createSerializerInstance(
+            PrecomputedParameters precomputed, TypeSerializer<?>... 
originalSerializers) {
+        return new RowSqnInfoSerializer(precomputed, originalSerializers);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<RowSqnInfo> snapshotConfiguration() {
+        return new RowSqnInfoSerializerSnapshot(this);
+    }
+
+    public static class RowSqnInfoSerializerSnapshot
+            extends CompositeTypeSerializerSnapshot<RowSqnInfo, 
RowSqnInfoSerializer> {
+
+        @SuppressWarnings("unused")
+        public RowSqnInfoSerializerSnapshot() {}
+
+        RowSqnInfoSerializerSnapshot(RowSqnInfoSerializer serializer) {
+            super(serializer);
+        }
+
+        @Override
+        protected int getCurrentOuterSnapshotVersion() {
+            return 0;
+        }
+
+        @Override
+        protected TypeSerializer<?>[] 
getNestedSerializers(RowSqnInfoSerializer outerSerializer) {
+            return new TypeSerializer[] {LongSerializer.INSTANCE, 
LongSerializer.INSTANCE};
+        }
+
+        @Override
+        protected RowSqnInfoSerializer 
createOuterSerializerWithNestedSerializers(
+                TypeSerializer<?>[] nestedSerializers) {
+            return new RowSqnInfoSerializer(null, nestedSerializers);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
new file mode 100644
index 00000000000..f31db7c80b1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.SerializerFactory;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.InternalKeyContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriFunctionWithException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.LongStream;
+
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_ALL;
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_LAST_ADDED;
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND;
+import static 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_OTHER;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Test for various implementations of {@link SequencedMultiSetState}. */
+@SuppressWarnings({"SameParameterValue", "unused"})
+@ExtendWith(ParameterizedTestExtension.class)
+public class SequencedMultiSetStateTest {
+
+    @Parameter(0)
+    private Strategy strategy;
+
+    @Parameter(1)
+    private long adaptiveLowThresholdOverride;
+
+    @Parameter(2)
+    private long adaptiveHighThresholdOverride;
+
+    @Parameters(name = "strategy={0}, lowThreshold={1}, highThreshold={2}")
+    public static Object[][] parameters() {
+        return new Object[][] {
+            new Object[] {Strategy.VALUE_STATE, -1, -1},
+            new Object[] {Strategy.MAP_STATE, -1, -1},
+            new Object[] {Strategy.ADAPTIVE, 0, 1},
+            new Object[] {Strategy.ADAPTIVE, 1, 2},
+            new Object[] {Strategy.ADAPTIVE, 0, 10},
+            new Object[] {Strategy.ADAPTIVE, 9, 10},
+        };
+    }
+
+    // for simplicity, all tests use string type only, with row key being the 
1st column
+    private static final LogicalType VARCHAR = 
DataTypes.VARCHAR(50).getLogicalType();
+    public static final int KEY_POS = 0;
+
+    @TestTemplate
+    public void testBasicFlow() throws Exception {
+        runTest(
+                (state, keyContext) -> {
+                    keyContext.setCurrentKey("sk1");
+                    assertTrue(state.isEmpty());
+
+                    state.add(row("key", "value"), 1L);
+                    assertFalse(state.isEmpty());
+
+                    keyContext.setCurrentKey("sk2");
+                    assertTrue(state.isEmpty());
+
+                    keyContext.setCurrentKey("sk1");
+                    state.clear();
+                    assertStateContents(state);
+                });
+    }
+
+    @TestTemplate
+    public void testAppend() throws Exception {
+        runTest(
+                state -> {
+                    // should always keep appending
+                    state.append(row("k1", "x"), 777L);
+                    assertStateContents(state, Tuple2.of(row("k1", "x"), 
777L));
+
+                    state.append(row("k1", "x"), 778L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "x"), 777L),
+                            Tuple2.of(row("k1", "x"), 778L));
+
+                    state.append(row("k2", "y"), 779L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "x"), 777L),
+                            Tuple2.of(row("k1", "x"), 778L),
+                            Tuple2.of(row("k2", "y"), 779L));
+
+                    state.append(row("k1", "x"), 777L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "x"), 777L),
+                            Tuple2.of(row("k1", "x"), 778L),
+                            Tuple2.of(row("k2", "y"), 779L),
+                            Tuple2.of(row("k1", "x"), 777L));
+                });
+    }
+
+    @TestTemplate
+    public void testAdd() throws Exception {
+        runTest(
+                state -> {
+                    state.add(row("k1", "x"), 777L);
+                    assertStateContents(state, row("k1", "x"), 777L);
+
+                    state.add(row("k2", "x"), 777L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "x"), 777L),
+                            Tuple2.of(row("k2", "x"), 777L));
+
+                    state.add(row("k2", "y"), 778L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "x"), 777L),
+                            Tuple2.of(row("k2", "y"), 778L));
+
+                    state.add(row("k1", "y"), 778L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "y"), 778L),
+                            Tuple2.of(row("k2", "y"), 778L));
+                });
+    }
+
+    @TestTemplate
+    public void testRemove() throws Exception {
+        runTest(
+                state -> {
+                    removeAndAssert(state, row("key1"), REMOVAL_NOT_FOUND);
+
+                    state.add(row("key1", "value"), 777L);
+                    state.add(row("key2", "value"), 777L);
+                    state.add(row("key3", "value"), 777L);
+                    state.add(row("key4", "value"), 777L);
+
+                    removeAndAssert(state, row("key999"), REMOVAL_NOT_FOUND);
+                    removeAndAssert(state, row("key4"), REMOVAL_LAST_ADDED, 
row("key3", "value"));
+                    removeAndAssert(state, row("key3"), REMOVAL_LAST_ADDED, 
row("key2", "value"));
+                    removeAndAssert(state, row("key1"), REMOVAL_OTHER);
+                    removeAndAssert(
+                            state,
+                            row("key2", "value-to-return"),
+                            REMOVAL_ALL,
+                            // value-to-return should be returned, not the 
original value
+                            // according to the current logic of Flink 
operators
+                            row("key2", "value-to-return"));
+
+                    // shouldn't fail e.g. due to bad pointers
+                    removeAndAssert(state, row("key1"), REMOVAL_NOT_FOUND);
+                    removeAndAssert(state, row("key2"), REMOVAL_NOT_FOUND);
+                });
+    }
+
+    @TestTemplate
+    public void testAddAfterRemovingTail() throws Exception {
+        runTest(
+                state -> {
+                    state.add(row("key1", "value-1"), 777L);
+                    state.add(row("key2", "value-2"), 777L);
+
+                    // key1 is the tail now - remove it and then add
+                    removeAndAssert(state, row("key1"), REMOVAL_OTHER, 
row("key1", "value-1"));
+                    state.add(row("key1", "value-1"), 777L);
+
+                    // key2 is the tail now - remove it and then add
+                    removeAndAssert(state, row("key2"), REMOVAL_OTHER, 
row("key2", "value-2"));
+                    state.add(row("key2", "value-2"), 777L);
+                });
+    }
+
+    @TestTemplate
+    public void testRemoveFirstAppended() throws Exception {
+        runTest(
+                state -> {
+                    state.append(row("key", "value-1"), 777L);
+                    state.append(row("key", "value-2"), 778L);
+                    state.append(row("key", "value-3"), 779L);
+
+                    removeAndAssert(state, row("key"), REMOVAL_OTHER);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("key", "value-2"), 778L),
+                            Tuple2.of(row("key", "value-3"), 779L));
+
+                    removeAndAssert(state, row("key"), REMOVAL_OTHER);
+                    assertStateContents(state, Tuple2.of(row("key", 
"value-3"), 779L));
+
+                    removeAndAssert(state, row("key"), REMOVAL_ALL, 
row("key"));
+                    assertTrue(state.isEmpty());
+                });
+    }
+
+    @TestTemplate
+    public void testRemoveWithInterleavingRowAppended() throws Exception {
+        runTest(
+                state -> {
+                    state.append(row("key1", "value"), 777L); // sqn = 1
+                    state.append(row("key2", "value"), 777L); // sqn = 2
+                    state.append(row("key2", "value"), 778L); // sqn = 3
+                    removeAndAssert(state, row("key2"), REMOVAL_OTHER, 
row("key2", "value"));
+                    removeAndAssert(state, row("key2"), REMOVAL_LAST_ADDED, 
row("key1", "value"));
+                    removeAndAssert(
+                            state,
+                            row("key1", "value-to-return"),
+                            REMOVAL_ALL,
+                            row("key1", "value-to-return"));
+                });
+    }
+
+    /** Test that loading and clearing the cache doesn't impact correctness. */
+    @TestTemplate
+    public void testCaching() throws Exception {
+        runTest(
+                (state, ctx) -> {
+                    ctx.setCurrentKey("sk1");
+                    state.add(row("key", "value-1"), 777L);
+                    state.clearCache();
+                    assertFalse(state.isEmpty());
+
+                    ctx.setCurrentKey("sk2");
+                    state.loadCache();
+                    assertTrue(state.isEmpty());
+                });
+    }
+
+    /** Test that loading and clearing the cache doesn't impact correctness. */
+    @TestTemplate
+    public void testKeyExtraction() throws Exception {
+        final Function<RowData, RowData> keyExtractor =
+                row -> ProjectedRowData.from(new int[] {1}).replaceRow(row);
+
+        runTest(
+                (state, ctx) -> {
+                    ctx.setCurrentKey("sk1");
+                    state.add(row("value-123", "key"), 777L);
+                    assertFalse(state.isEmpty());
+                    StateChangeInfo<RowData> ret = 
state.remove(row("value-456", "key"));
+                    Tuple3.of(ret.getSizeAfter(), ret.getChangeType(), 
ret.getPayload());
+                    assertTrue(state.isEmpty());
+                },
+                keyExtractor,
+                0);
+    }
+
+    /** Test that row kind is not taken into account when matching the rows. */
+    @TestTemplate
+    public void testRowKindNormalization() throws Exception {
+        runTest(
+                state -> {
+                    for (RowKind firstKind : RowKind.values()) {
+                        for (RowKind secondKind : RowKind.values()) {
+
+                            state.append(rowOfKind(firstKind, "key", "value"), 
778L);
+                            state.remove(rowOfKind(secondKind, "key", 
"value"));
+                            assertTrue(state.isEmpty());
+
+                            state.add(rowOfKind(firstKind, "key", "value"), 
777L);
+                            state.remove(rowOfKind(secondKind, "key", 
"value"));
+                            assertTrue(state.isEmpty());
+
+                            state.add(rowOfKind(firstKind, "key", "value"), 
777L);
+                            state.add(rowOfKind(secondKind, "key", "value"), 
778L);
+                            assertStateContents(state, Tuple2.of(row("key", 
"value"), 778L));
+                            state.clear();
+                        }
+                    }
+                });
+    }
+
+    @TestTemplate
+    public void testAdaptivity() throws Exception {
+        assumeTrue(strategy == Strategy.ADAPTIVE);
+        final long totalSize = adaptiveHighThresholdOverride * 2;
+        runTest(
+                (state, ctx) -> {
+                    AdaptiveSequencedMultiSetState ad = 
(AdaptiveSequencedMultiSetState) state;
+                    int runningSize = 0;
+
+                    ctx.setCurrentKey("k1");
+                    assertFalse(ad.isIsUsingLargeState(), "should start with 
value state");
+                    for (; runningSize < totalSize; runningSize++) {
+                        assertEquals(
+                                runningSize >= adaptiveHighThresholdOverride,
+                                ad.isIsUsingLargeState(),
+                                "should switch after reaching high threshold");
+                        ad.append(row("key", "value"), runningSize /* 
timestamp */);
+                    }
+
+                    ctx.setCurrentKey("k2");
+                    assertFalse(ad.isIsUsingLargeState(), "should not mix 
different context keys");
+
+                    ctx.setCurrentKey("k1");
+                    assertTrue(ad.isIsUsingLargeState(), "should not mix 
different context keys");
+
+                    // remove until hitting the threshold - shouldn't trigger 
switch
+                    for (; runningSize > adaptiveLowThresholdOverride + 1; 
runningSize--) {
+                        ad.remove(row("key"));
+                        assertTrue(
+                                ad.isIsUsingLargeState(),
+                                "should switch back after reaching low 
threshold");
+                    }
+                    // trigger switch
+                    ad.remove(row("key"));
+                    runningSize--;
+                    assertFalse(
+                            ad.isIsUsingLargeState(),
+                            "should switch back after reaching low threshold");
+                    // verify the order of the migrated elements by looking at 
their timestamps
+                    //noinspection unchecked
+                    assertStateContents(
+                            state,
+                            LongStream.range(totalSize - runningSize, 
totalSize)
+                                    .mapToObj(ts -> Tuple2.of(row("key", 
"value"), ts))
+                                    .toArray(Tuple2[]::new));
+                    for (; runningSize > 0; runningSize--) {
+                        assertFalse(
+                                ad.isIsUsingLargeState(),
+                                "should switch back after reaching low 
threshold");
+                        ad.remove(row("key"));
+                    }
+                    assertTrue(ad.isEmpty());
+                    assertEquals(0, runningSize);
+
+                    for (; runningSize < totalSize; runningSize++) {
+                        assertEquals(
+                                runningSize >= adaptiveHighThresholdOverride,
+                                ad.isIsUsingLargeState(),
+                                "should switch after reaching high threshold");
+                        ad.add(row(Integer.toString(runningSize), "value"), 
777L);
+                    }
+                    assertTrue(ad.isIsUsingLargeState());
+
+                    state.clear();
+                    assertFalse(
+                            ad.isIsUsingLargeState(), "should switch to value 
state after clear");
+                });
+    }
+
+    @TestTemplate
+    public void testAddReturnValues() throws Exception {
+        testReturnValues(SequencedMultiSetState::add);
+    }
+
+    @TestTemplate
+    public void testAppendReturnValues() throws Exception {
+        testReturnValues(SequencedMultiSetState::append);
+    }
+
+    private void testReturnValues(
+            TriFunctionWithException<
+                            SequencedMultiSetState<RowData>,
+                            RowData,
+                            Long,
+                            StateChangeInfo<RowData>,
+                            Exception>
+                    updateFn)
+            throws Exception {
+        runTest(
+                state -> {
+                    StateChangeInfo<RowData> ret;
+
+                    ret = updateFn.apply(state, row("key-1", "value"), 777L);
+                    assertEquals(StateChangeType.ADDITION, 
ret.getChangeType());
+                    assertEquals(1, ret.getSizeAfter());
+                    assertTrue(ret.wasEmpty());
+
+                    ret = updateFn.apply(state, row("key-2", "value"), 777L);
+                    assertEquals(StateChangeType.ADDITION, 
ret.getChangeType());
+                    assertEquals(2, ret.getSizeAfter());
+                    assertFalse(ret.wasEmpty());
+
+                    removeAndAssert(state, row("key-1"), REMOVAL_OTHER);
+                    removeAndAssert(state, row("key-2"), REMOVAL_ALL, 
row("key-2"));
+
+                    ret = updateFn.apply(state, row("key-3", "value"), 777L);
+                    assertEquals(StateChangeType.ADDITION, 
ret.getChangeType());
+                    assertEquals(1, ret.getSizeAfter());
+                    assertTrue(ret.wasEmpty());
+                });
+    }
+
+    private void runTest(ThrowingConsumer<SequencedMultiSetState<RowData>, 
Exception> test)
+            throws Exception {
+        runTest(
+                (state, keyContext) -> {
+                    keyContext.setCurrentKey("key1");
+                    test.accept(state);
+                });
+    }
+
+    private void runTest(
+            BiConsumerWithException<
+                            SequencedMultiSetState<RowData>, 
InternalKeyContext<String>, Exception>
+                    test)
+            throws Exception {
+        runTest(test, Function.identity(), KEY_POS);
+    }
+
+    private void runTest(
+            BiConsumerWithException<
+                            SequencedMultiSetState<RowData>, 
InternalKeyContext<String>, Exception>
+                    test,
+            Function<RowData, RowData> keyExtractor,
+            int keyPos)
+            throws Exception {
+        SequencedMultiSetStateContext p =
+                new SequencedMultiSetStateContext(
+                        new RowDataSerializer(VARCHAR),
+                        new MyGeneratedEqualiser(keyPos),
+                        new MyGeneratedHashFunction(keyPos),
+                        new RowDataSerializer(VARCHAR, VARCHAR),
+                        keyExtractor,
+                        new SequencedMultiSetStateConfig(
+                                strategy,
+                                adaptiveHighThresholdOverride,
+                                adaptiveLowThresholdOverride,
+                                StateTtlConfig.DISABLED,
+                                TimeDomain.EVENT_TIME));
+
+        MockEnvironment env = new MockEnvironmentBuilder().build();
+
+        AbstractKeyedStateBackend<String> stateBackend =
+                getKeyedStateBackend(env, StringSerializer.INSTANCE);
+
+        RuntimeContext ctx =
+                new StreamingRuntimeContext(
+                        env,
+                        Collections.emptyMap(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup(),
+                        new OperatorID(),
+                        new TestProcessingTimeService(),
+                        getKeyedStateStore(stateBackend),
+                        ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
+
+        test.accept(SequencedMultiSetState.create(p, ctx, "hashmap"), 
stateBackend);
+    }
+
+    private static KeyedStateStore 
getKeyedStateStore(KeyedStateBackend<String> stateBackend) {
+        return new DefaultKeyedStateStore(
+                stateBackend,
+                new SerializerFactory() {
+                    @Override
+                    public <T> TypeSerializer<T> 
createSerializer(TypeInformation<T> ti) {
+                        return ti.createSerializer(new SerializerConfigImpl());
+                    }
+                });
+    }
+
+    private static <T> AbstractKeyedStateBackend<T> getKeyedStateBackend(
+            MockEnvironment env, TypeSerializer<T> keySerializer) throws 
IOException {
+        String op = "test-operator";
+        JobID jobId = new JobID();
+        JobVertexID jobVertexId = new JobVertexID();
+        KeyGroupRange emptyKeyGroupRange = KeyGroupRange.of(0, 10);
+        int numberOfKeyGroups = emptyKeyGroupRange.getNumberOfKeyGroups();
+
+        return new HashMapStateBackend()
+                .createKeyedStateBackend(
+                        new KeyedStateBackendParametersImpl<>(
+                                env,
+                                jobId,
+                                op,
+                                keySerializer,
+                                numberOfKeyGroups,
+                                emptyKeyGroupRange,
+                                new 
KvStateRegistry().createTaskRegistry(jobId, jobVertexId),
+                                TtlTimeProvider.DEFAULT,
+                                new UnregisteredMetricsGroup(),
+                                Collections.emptyList(),
+                                new CloseableRegistry()));
+    }
+
+    private static class MyGeneratedEqualiser extends GeneratedRecordEqualiser 
{
+
+        private final int keyPos;
+
+        public MyGeneratedEqualiser(int keyPos) {
+            super("", "", new Object[0]);
+            this.keyPos = keyPos;
+        }
+
+        @Override
+        public RecordEqualiser newInstance(ClassLoader classLoader) {
+            return new TestRecordEqualiser(keyPos);
+        }
+    }
+
+    private static class MyGeneratedHashFunction extends GeneratedHashFunction 
{
+
+        private final int keyPos;
+
+        public MyGeneratedHashFunction(int keyPos) {
+            super("", "", new Object[0], new Configuration());
+            this.keyPos = keyPos;
+        }
+
+        @Override
+        public HashFunction newInstance(ClassLoader classLoader) {
+            return new TestRecordEqualiser(keyPos);
+        }
+    }
+
+    private static class TestRecordEqualiser implements RecordEqualiser, 
HashFunction {
+
+        private final int keyPos;
+
+        private TestRecordEqualiser(int keyPos) {
+            this.keyPos = keyPos;
+        }
+
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getRowKind() == row2.getRowKind()
+                    && row1.getString(keyPos).equals(row2.getString(keyPos));
+        }
+
+        @Override
+        public int hashCode(Object data) {
+            RowData rd = (RowData) data;
+            return Objects.hash(rd.getRowKind(), rd.getString(keyPos));
+        }
+    }
+
+    private static void assertStateContents(
+            SequencedMultiSetState<RowData> state, RowData rowData, Long 
timestamp)
+            throws Exception {
+        assertStateContents(state, Tuple2.of(rowData, timestamp));
+    }
+
+    @SafeVarargs
+    private static void assertStateContents(
+            SequencedMultiSetState<RowData> state, Tuple2<RowData, Long>... 
expectedArr)
+            throws Exception {
+        List<Tuple2<RowData, Long>> actual = new ArrayList<>();
+        state.iterator().forEachRemaining(actual::add);
+        assertEquals(expectedArr.length == 0, state.isEmpty());
+        assertEquals(expectedArr.length, actual.size());
+        Assertions.assertArrayEquals(expectedArr, actual.toArray());
+    }
+
+    private static void removeAndAssert(
+            SequencedMultiSetState<RowData> state,
+            RowData key,
+            StateChangeType expectedResultType,
+            RowData... expectedReturnedRow)
+            throws Exception {
+        StateChangeInfo<RowData> ret = state.remove(key);
+
+        assertEquals(expectedResultType, ret.getChangeType());
+        switch (ret.getChangeType()) {
+            case REMOVAL_NOT_FOUND:
+                assertEquals(Optional.empty(), ret.getPayload());
+                break;
+            case REMOVAL_ALL:
+                assertEquals(0, ret.getSizeAfter());
+                assertTrue(state.isEmpty(), "state is expected to be empty");
+                assertEquals(Optional.of(expectedReturnedRow[0]), 
ret.getPayload());
+                break;
+            case REMOVAL_OTHER:
+                assertFalse(state.isEmpty(), "state is expected to be 
non-empty");
+                assertEquals(Optional.empty(), ret.getPayload());
+                break;
+            case REMOVAL_LAST_ADDED:
+                assertFalse(state.isEmpty(), "state is expected to be 
non-empty");
+                assertEquals(Optional.of(expectedReturnedRow[0]), 
ret.getPayload());
+                break;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java
new file mode 100644
index 00000000000..92bf9d29aaf
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Test for {@link MetaSqnInfoSerializer}. */
+class MetaSqnInfoSerializerTest extends SerializerTestBase<MetaSqnInfo> {
+
+    @Override
+    protected TypeSerializer<MetaSqnInfo> createSerializer() {
+        return new MetaSqnInfoSerializer();
+    }
+
+    @Override
+    protected int getLength() {
+        return 2 * Long.SIZE / 8;
+    }
+
+    @Override
+    protected Class<MetaSqnInfo> getTypeClass() {
+        return MetaSqnInfo.class;
+    }
+
+    @Override
+    protected MetaSqnInfo[] getTestData() {
+        return new MetaSqnInfo[] {MetaSqnInfo.of(1L, 2L), MetaSqnInfo.of(1L, 
1L)};
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java
new file mode 100644
index 00000000000..1dbdaf3b355
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordUtils;
+import org.apache.flink.table.types.logical.IntType;
+
+/** Test for {@link RowDataKeySerializer}. */
+public class NodeSerializerTest extends SerializerTestBase<Node> {
+
+    @Override
+    protected TypeSerializer<Node> createSerializer() {
+        return new NodeSerializer(new RowDataSerializer(new IntType()));
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<Node> getTypeClass() {
+        return Node.class;
+    }
+
+    @Override
+    protected Node[] getTestData() {
+        return new Node[] {
+            new Node(StreamRecordUtils.row(1), 1L, null, 2L, 2L, 1L),
+            new Node(StreamRecordUtils.row(2), 2L, 1L, 3L, 3L, 2L),
+            new Node(StreamRecordUtils.row(3), 3L, 2L, null, null, 3L),
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
new file mode 100644
index 00000000000..f78faf9f343
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordUtils;
+import org.apache.flink.table.types.logical.IntType;
+
+import java.util.Objects;
+
+/** Test for {@link RowDataKeySerializer}. */
+public class RowDataKeySerializerTest extends SerializerTestBase<RowDataKey> {
+
+    private final TestRecordEqualiser equaliser = new TestRecordEqualiser();
+
+    @Override
+    protected TypeSerializer<RowDataKey> createSerializer() {
+        return new RowDataKeySerializer(
+                new RowDataSerializer(new IntType()),
+                equaliser,
+                equaliser,
+                EQUALISER,
+                HASH_FUNCTION);
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<RowDataKey> getTypeClass() {
+        return RowDataKey.class;
+    }
+
+    @Override
+    protected RowDataKey[] getTestData() {
+        return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), 
equaliser, equaliser)};
+    }
+
+    static final GeneratedRecordEqualiser EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    static final GeneratedHashFunction HASH_FUNCTION =
+            new GeneratedHashFunction("", "", new Object[0], new 
Configuration()) {
+                @Override
+                public HashFunction newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static class TestRecordEqualiser implements RecordEqualiser, 
HashFunction {
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getRowKind() == row2.getRowKind() && row1.getInt(0) == 
row2.getInt(0);
+        }
+
+        @Override
+        public int hashCode(Object data) {
+            RowData rd = (RowData) data;
+            return Objects.hash(rd.getRowKind(), rd.getInt(0));
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return obj instanceof TestRecordEqualiser;
+        }
+
+        @Override
+        public int hashCode() {
+            return 0;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java
new file mode 100644
index 00000000000..fd584c3862f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.sequencedmultisetstate.linked;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Test for {@link RowSqnInfoSerializer}. */
+class RowSqnInfoSerializerTest extends SerializerTestBase<RowSqnInfo> {
+
+    @Override
+    protected TypeSerializer<RowSqnInfo> createSerializer() {
+        return new RowSqnInfoSerializer();
+    }
+
+    @Override
+    protected int getLength() {
+        return 2 * Long.SIZE / 8;
+    }
+
+    @Override
+    protected Class<RowSqnInfo> getTypeClass() {
+        return RowSqnInfo.class;
+    }
+
+    @Override
+    protected RowSqnInfo[] getTestData() {
+        return new RowSqnInfo[] {RowSqnInfo.of(1L, 2L), RowSqnInfo.of(1L, 1L)};
+    }
+}
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 1988f8e63b5..e4e9de6e3dd 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -193,6 +193,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime</artifactId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 691460a4012..976cef6349b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -74,6 +74,10 @@ import org.apache.flink.table.dataview.MapViewSerializer;
 import org.apache.flink.table.dataview.NullAwareMapSerializer;
 import org.apache.flink.table.dataview.NullSerializer;
 import org.apache.flink.table.runtime.operators.window.CountWindow;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializer;
+import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowSqnInfoSerializer;
 import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.DecimalDataSerializer;
@@ -255,6 +259,10 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         // KeyAndValueSerializer shouldn't be used to 
serialize data to state and
                         // doesn't need to ensure upgrade compatibility.
                         
"org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer",
+                        RowDataKeySerializer.class.getName(),
+                        NodeSerializer.class.getName(),
+                        RowSqnInfoSerializer.class.getName(),
+                        MetaSqnInfoSerializer.class.getName(),
                         SetSerializer.class.getName());
 
         // check if a test exists for each type serializer

Reply via email to