This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dbfc83d1cfd [FLINK-35268][state] Add ttl interface for Async State API 
&& implement TtlListStateV2/TtlValueStateV2 (#25515)
dbfc83d1cfd is described below

commit dbfc83d1cfd16bb464fe8d2329b4b13cd440820d
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Wed Oct 30 11:35:47 2024 +0800

    [FLINK-35268][state] Add ttl interface for Async State API && implement 
TtlListStateV2/TtlValueStateV2 (#25515)
---
 .../asyncprocessing/AbstractStateIterator.java     |   5 +-
 .../runtime/asyncprocessing/StateRequestType.java  |   6 -
 .../runtime/state/AsyncKeyedStateBackend.java      |  19 +
 .../runtime/state/ttl/AbstractTtlDecorator.java    |  41 +-
 .../flink/runtime/state/ttl/TtlReduceFunction.java |   4 +-
 .../flink/runtime/state/ttl/TtlStateContext.java   |  14 +-
 .../apache/flink/runtime/state/ttl/TtlUtils.java   |   9 +-
 .../runtime/state/v2/AbstractAggregatingState.java |  55 +--
 .../runtime/state/v2/AbstractReducingState.java    |  52 ++-
 .../flink/runtime/state/v2/MapStateDescriptor.java |   8 +
 .../state/v2/adaptor/AggregatingStateAdaptor.java  |  41 ++
 .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java  |  12 +
 .../state/v2/adaptor/CompleteStateIterator.java    |   2 +-
 .../state/v2/adaptor/ReducingStateAdaptor.java     |  39 +-
 .../v2/internal/InternalAggregatingState.java      |   4 +-
 .../state/v2/internal/InternalAppendingState.java  |  31 +-
 .../runtime/state/v2/ttl/AbstractTtlState.java     |  57 +++
 .../runtime/state/v2/ttl/TtlAggregateFunction.java |  84 ++++
 .../runtime/state/v2/ttl/TtlAggregatingState.java  | 104 +++++
 .../flink/runtime/state/v2/ttl/TtlListState.java   | 219 +++++++++++
 .../flink/runtime/state/v2/ttl/TtlMapState.java    | 281 +++++++++++++
 .../runtime/state/v2/ttl/TtlReducingState.java     |  94 +++++
 .../runtime/state/v2/ttl/TtlStateFactory.java      | 433 +++++++++++++++++++++
 .../flink/runtime/state/v2/ttl/TtlValueState.java  |  62 +++
 .../flink/runtime/state/StateBackendTestUtils.java |  11 +
 .../state/v2/AbstractAggregatingStateTest.java     |  12 +-
 .../state/v2/AbstractKeyedStateTestBase.java       |  11 +
 .../state/v2/AbstractReducingStateTest.java        |  14 +-
 .../runtime/state/v2/StateBackendTestV2Base.java   |  62 +++
 .../flink/state/forst/ForStAggregatingState.java   |   5 +-
 .../forst/ForStDBTtlCompactFiltersManager.java     |  24 +-
 .../flink/state/forst/ForStKeyedStateBackend.java  |  34 +-
 .../state/forst/ForStKeyedStateBackendBuilder.java |  14 +-
 .../flink/state/forst/ForStOperationUtils.java     |  31 +-
 .../flink/state/forst/ForStReducingState.java      |   4 +-
 .../flink/state/forst/ForStStateBackend.java       |   1 +
 .../completeness/TypeInfoTestCoverageTest.java     |   4 +-
 .../TypeSerializerTestCoverageTest.java            |   2 +
 38 files changed, 1800 insertions(+), 105 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
index 12ffdaae427..7683fc6a482 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
@@ -106,7 +106,10 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
         Collection<StateFuture<? extends U>> resultFutures = new ArrayList<>();
 
         for (T item : cache) {
-            resultFutures.add(iterating.apply(item));
+            StateFuture<? extends U> resultFuture = iterating.apply(item);
+            if (resultFuture != null) {
+                resultFutures.add(resultFuture);
+            }
         }
         if (hasNext()) {
             return StateFutureUtils.combineAll(resultFutures)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java
index 8d449692105..504115a48fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java
@@ -108,15 +108,9 @@ public enum StateRequestType {
     /** Add element into reducing state, {@link 
ReducingState#asyncAdd(Object)}. */
     REDUCING_ADD,
 
-    /** Remove element from reducing state. */
-    REDUCING_REMOVE,
-
     /** Get value from aggregating state by {@link 
AggregatingState#asyncGet()}. */
     AGGREGATING_GET,
 
-    /** Remove element from aggregate state. */
-    AGGREGATING_REMOVE,
-
     /** Add element to aggregating state by {@link 
AggregatingState#asyncAdd(Object)}. */
     AGGREGATING_ADD
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index aebd8af24dc..d6e606529aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.asyncprocessing.StateExecutor;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.state.v2.StateDescriptor;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 import org.apache.flink.util.Disposable;
 
 import javax.annotation.Nonnull;
@@ -72,6 +73,24 @@ public interface AsyncKeyedStateBackend<K>
             @Nonnull StateDescriptor<SV> stateDesc)
             throws Exception;
 
+    /**
+     * Creates and returns a new state for internal usage.
+     *
+     * @param <N> the type of namespace for partitioning.
+     * @param <S> The type of the public API state.
+     * @param <SV> The type of the stored state value.
+     * @param defaultNamespace the default namespace for this state.
+     * @param namespaceSerializer the serializer for namespace.
+     * @param stateDesc The {@code StateDescriptor} that contains the name of 
the state.
+     * @throws Exception Exceptions may occur during initialization of the 
state.
+     */
+    @Nonnull
+    <N, S extends InternalKeyedState, SV> S createStateInternal(
+            @Nonnull N defaultNamespace,
+            @Nonnull TypeSerializer<N> namespaceSerializer,
+            @Nonnull StateDescriptor<SV> stateDesc)
+            throws Exception;
+
     /**
      * Creates a {@code StateExecutor} which supports to execute a batch of 
state requests
      * asynchronously.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index cd08292ec9c..ac42b72d009 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -29,24 +29,25 @@ import org.apache.flink.util.function.ThrowingRunnable;
  *
  * @param <T> Type of originally wrapped object
  */
-abstract class AbstractTtlDecorator<T> {
+public abstract class AbstractTtlDecorator<T> {
     /** Wrapped original state handler. */
-    final T original;
+    protected final T original;
 
-    final StateTtlConfig config;
+    protected final StateTtlConfig config;
 
-    final TtlTimeProvider timeProvider;
+    protected final TtlTimeProvider timeProvider;
 
     /** Whether to renew expiration timestamp on state read access. */
-    final boolean updateTsOnRead;
+    protected final boolean updateTsOnRead;
 
     /** Whether to renew expiration timestamp on state read access. */
-    final boolean returnExpired;
+    protected final boolean returnExpired;
 
     /** State value time to live in milliseconds. */
-    final long ttl;
+    protected final long ttl;
 
-    AbstractTtlDecorator(T original, StateTtlConfig config, TtlTimeProvider 
timeProvider) {
+    protected AbstractTtlDecorator(
+            T original, StateTtlConfig config, TtlTimeProvider timeProvider) {
         Preconditions.checkNotNull(original);
         Preconditions.checkNotNull(config);
         Preconditions.checkNotNull(timeProvider);
@@ -60,25 +61,25 @@ abstract class AbstractTtlDecorator<T> {
         this.ttl = config.getTimeToLive().toMillis();
     }
 
-    <V> V getUnexpired(TtlValue<V> ttlValue) {
+    public <V> V getUnexpired(TtlValue<V> ttlValue) {
         return ttlValue == null || (!returnExpired && expired(ttlValue))
                 ? null
                 : ttlValue.getUserValue();
     }
 
-    <V> boolean expired(TtlValue<V> ttlValue) {
+    public <V> boolean expired(TtlValue<V> ttlValue) {
         return TtlUtils.expired(ttlValue, ttl, timeProvider);
     }
 
-    <V> TtlValue<V> wrapWithTs(V value) {
+    public <V> TtlValue<V> wrapWithTs(V value) {
         return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
     }
 
-    <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
+    public <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
         return wrapWithTs(ttlValue.getUserValue());
     }
 
-    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V>
+    public <SE extends Throwable, CE extends Throwable, CLE extends Throwable, 
V>
             V getWithTtlCheckAndUpdate(
                     SupplierWithException<TtlValue<V>, SE> getter,
                     ThrowingConsumer<TtlValue<V>, CE> updater,
@@ -88,7 +89,7 @@ abstract class AbstractTtlDecorator<T> {
         return ttlValue == null ? null : ttlValue.getUserValue();
     }
 
-    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V>
+    public <SE extends Throwable, CE extends Throwable, CLE extends Throwable, 
V>
             TtlValue<V> getWrappedWithTtlCheckAndUpdate(
                     SupplierWithException<TtlValue<V>, SE> getter,
                     ThrowingConsumer<TtlValue<V>, CE> updater,
@@ -107,4 +108,16 @@ abstract class AbstractTtlDecorator<T> {
         }
         return ttlValue;
     }
+
+    protected <T> T getElementWithTtlCheck(TtlValue<T> ttlValue) {
+        if (ttlValue == null) {
+            return null;
+        } else if (expired(ttlValue)) {
+            // don't clear state here cause forst is LSM-tree based.
+            if (!returnExpired) {
+                return null;
+            }
+        }
+        return ttlValue.getUserValue();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
index 3d4ef4cc62e..eb30f0c70f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.common.state.StateTtlConfig;
  *
  * @param <T> Type of the user value of state with TTL
  */
-class TtlReduceFunction<T> extends AbstractTtlDecorator<ReduceFunction<T>>
+public class TtlReduceFunction<T> extends 
AbstractTtlDecorator<ReduceFunction<T>>
         implements ReduceFunction<TtlValue<T>> {
 
-    TtlReduceFunction(
+    public TtlReduceFunction(
             ReduceFunction<T> originalReduceFunction,
             StateTtlConfig config,
             TtlTimeProvider timeProvider) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
index c6f629c326c..994b68c9fa7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
@@ -21,20 +21,20 @@ package org.apache.flink.runtime.state.ttl;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-class TtlStateContext<T, SV> {
+public class TtlStateContext<T, SV> {
     /** Wrapped original state handler. */
-    final T original;
+    public final T original;
 
-    final StateTtlConfig config;
-    final TtlTimeProvider timeProvider;
+    public final StateTtlConfig config;
+    public final TtlTimeProvider timeProvider;
 
     /** Serializer of original user stored value without timestamp. */
-    final TypeSerializer<SV> valueSerializer;
+    public final TypeSerializer<SV> valueSerializer;
 
     /** This registered callback is to be called whenever state is accessed 
for read or write. */
-    final Runnable accessCallback;
+    public final Runnable accessCallback;
 
-    TtlStateContext(
+    public TtlStateContext(
             T original,
             StateTtlConfig config,
             TtlTimeProvider timeProvider,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index dbe937647ca..e4ce8927eb9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -22,17 +22,18 @@ import javax.annotation.Nullable;
 
 /** Common functions related to State TTL. */
 public class TtlUtils {
-    static <V> boolean expired(
+    public static <V> boolean expired(
             @Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider 
timeProvider) {
         return expired(ttlValue, ttl, timeProvider.currentTimestamp());
     }
 
-    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long 
currentTimestamp) {
+    public static <V> boolean expired(
+            @Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
         return ttlValue != null
                 && expired(ttlValue.getLastAccessTimestamp(), ttl, 
currentTimestamp);
     }
 
-    static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
+    public static boolean expired(long ts, long ttl, TtlTimeProvider 
timeProvider) {
         return expired(ts, ttl, timeProvider.currentTimestamp());
     }
 
@@ -45,7 +46,7 @@ public class TtlUtils {
         return ts + ttlWithoutOverflow;
     }
 
-    static <V> TtlValue<V> wrapWithTs(V value, long ts) {
+    public static <V> TtlValue<V> wrapWithTs(V value, long ts) {
         return new TtlValue<>(value, ts);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
index 899aeec1c48..95850d928ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java
@@ -58,49 +58,53 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
         this.aggregateFunction = stateDescriptor.getAggregateFunction();
     }
 
-    protected StateFuture<ACC> asyncGetAccumulator() {
-        return handleRequest(StateRequestType.AGGREGATING_GET, null);
-    }
-
     @Override
     public StateFuture<OUT> asyncGet() {
-        return asyncGetAccumulator()
+        return asyncGetInternal()
                 .thenApply(acc -> (acc == null) ? null : 
this.aggregateFunction.getResult(acc));
     }
 
     @Override
     public StateFuture<Void> asyncAdd(IN value) {
-        return asyncGetAccumulator()
+        return asyncGetInternal()
                 .thenAccept(
                         acc -> {
                             final ACC safeAcc =
                                     (acc == null)
                                             ? 
this.aggregateFunction.createAccumulator()
                                             : acc;
-                            handleRequest(
-                                    StateRequestType.AGGREGATING_ADD,
-                                    this.aggregateFunction.add(value, 
safeAcc));
+                            
asyncUpdateInternal(this.aggregateFunction.add(value, safeAcc));
                         });
     }
 
+    @Override
+    public StateFuture<ACC> asyncGetInternal() {
+        return handleRequest(StateRequestType.AGGREGATING_GET, null);
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) {
+        return handleRequest(StateRequestType.AGGREGATING_ADD, valueToStore);
+    }
+
     @Override
     public OUT get() {
-        return handleRequestSync(StateRequestType.AGGREGATING_GET, null);
+        ACC acc = getInternal();
+        return acc == null ? null : this.aggregateFunction.getResult(acc);
     }
 
     @Override
     public void add(IN value) {
-        ACC acc = handleRequestSync(StateRequestType.AGGREGATING_GET, null);
+        ACC acc = getInternal();
         try {
             ACC newValue =
                     acc == null
                             ? this.aggregateFunction.createAccumulator()
                             : this.aggregateFunction.add(value, acc);
-            handleRequestSync(StateRequestType.AGGREGATING_ADD, newValue);
+            updateInternal(newValue);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        handleRequestSync(StateRequestType.AGGREGATING_ADD, value);
     }
 
     @Override
@@ -113,16 +117,16 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
         for (N source : sources) {
             if (source != null) {
                 setCurrentNamespace(source);
-                futures.add(handleRequest(StateRequestType.AGGREGATING_GET, 
null));
+                futures.add(asyncGetInternal());
             }
         }
         setCurrentNamespace(target);
-        futures.add(handleRequest(StateRequestType.AGGREGATING_GET, null));
+        futures.add(asyncGetInternal());
         // phase 2: merge the sources to the target
         return StateFutureUtils.combineAll(futures)
                 .thenCompose(
                         values -> {
-                            List<StateFuture<ACC>> updateFutures =
+                            List<StateFuture<Void>> updateFutures =
                                     new ArrayList<>(sources.size() + 1);
                             ACC current = null;
                             Iterator<ACC> valueIterator = values.iterator();
@@ -130,9 +134,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
                                 ACC value = valueIterator.next();
                                 if (value != null) {
                                     setCurrentNamespace(source);
-                                    updateFutures.add(
-                                            handleRequest(
-                                                    
StateRequestType.AGGREGATING_REMOVE, null));
+                                    
updateFutures.add(asyncUpdateInternal(null));
                                     if (current == null) {
                                         current = value;
                                     } else {
@@ -146,8 +148,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
                                     current = aggregateFunction.merge(current, 
targetValue);
                                 }
                                 setCurrentNamespace(target);
-                                updateFutures.add(
-                                        
handleRequest(StateRequestType.AGGREGATING_ADD, current));
+                                
updateFutures.add(asyncUpdateInternal(current));
                             }
                             return StateFutureUtils.combineAll(updateFutures)
                                     .thenAccept(ignores -> {});
@@ -168,7 +169,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
                     ACC oldValue = 
handleRequestSync(StateRequestType.AGGREGATING_GET, null);
 
                     if (oldValue != null) {
-                        handleRequestSync(StateRequestType.AGGREGATING_REMOVE, 
null);
+                        handleRequestSync(StateRequestType.AGGREGATING_ADD, 
null);
 
                         if (current != null) {
                             current = aggregateFunction.merge(current, 
oldValue);
@@ -194,4 +195,14 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> 
extends AbstractKeyedS
             throw new RuntimeException("merge namespace fail.", e);
         }
     }
+
+    @Override
+    public ACC getInternal() {
+        return handleRequestSync(StateRequestType.AGGREGATING_GET, null);
+    }
+
+    @Override
+    public void updateInternal(ACC valueToStore) {
+        handleRequestSync(StateRequestType.AGGREGATING_ADD, valueToStore);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
index de7a19fd157..4fb7967bd6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java
@@ -50,33 +50,33 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
 
     @Override
     public StateFuture<V> asyncGet() {
-        return handleRequest(StateRequestType.REDUCING_GET, null);
+        return asyncGetInternal();
     }
 
     @Override
     public StateFuture<Void> asyncAdd(V value) {
-        return handleRequest(StateRequestType.REDUCING_GET, null)
+        return asyncGetInternal()
                 .thenAccept(
                         oldValue -> {
                             V newValue =
                                     oldValue == null
                                             ? value
                                             : reduceFunction.reduce((V) 
oldValue, value);
-                            handleRequest(StateRequestType.REDUCING_ADD, 
newValue);
+                            asyncUpdateInternal(newValue);
                         });
     }
 
     @Override
     public V get() {
-        return handleRequestSync(StateRequestType.REDUCING_GET, null);
+        return getInternal();
     }
 
     @Override
     public void add(V value) {
-        V oldValue = handleRequestSync(StateRequestType.REDUCING_GET, null);
+        V oldValue = getInternal();
         try {
             V newValue = oldValue == null ? value : 
reduceFunction.reduce(oldValue, value);
-            handleRequestSync(StateRequestType.REDUCING_ADD, newValue);
+            updateInternal(newValue);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -92,16 +92,16 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
         for (N source : sources) {
             if (source != null) {
                 setCurrentNamespace(source);
-                futures.add(handleRequest(StateRequestType.REDUCING_GET, 
null));
+                futures.add(asyncGetInternal());
             }
         }
         setCurrentNamespace(target);
-        futures.add(handleRequest(StateRequestType.REDUCING_GET, null));
+        futures.add(asyncGetInternal());
         // phase 2: merge the sources to the target
         return StateFutureUtils.combineAll(futures)
                 .thenCompose(
                         values -> {
-                            List<StateFuture<V>> updateFutures =
+                            List<StateFuture<Void>> updateFutures =
                                     new ArrayList<>(sources.size() + 1);
                             V current = null;
                             Iterator<V> valueIterator = values.iterator();
@@ -109,8 +109,7 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
                                 V value = valueIterator.next();
                                 if (value != null) {
                                     setCurrentNamespace(source);
-                                    updateFutures.add(
-                                            
handleRequest(StateRequestType.REDUCING_REMOVE, null));
+                                    
updateFutures.add(asyncUpdateInternal(null));
                                     if (current != null) {
                                         current = 
reduceFunction.reduce(current, value);
                                     } else {
@@ -124,8 +123,7 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
                                     current = reduceFunction.reduce(current, 
targetValue);
                                 }
                                 setCurrentNamespace(target);
-                                updateFutures.add(
-                                        
handleRequest(StateRequestType.REDUCING_ADD, current));
+                                
updateFutures.add(asyncUpdateInternal(current));
                             }
                             return StateFutureUtils.combineAll(updateFutures)
                                     .thenAccept(ignores -> {});
@@ -143,10 +141,10 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
             for (N source : sources) {
                 if (source != null) {
                     setCurrentNamespace(source);
-                    V oldValue = 
handleRequestSync(StateRequestType.REDUCING_GET, null);
+                    V oldValue = getInternal();
 
                     if (oldValue != null) {
-                        handleRequestSync(StateRequestType.REDUCING_REMOVE, 
null);
+                        updateInternal(null);
 
                         if (current != null) {
                             current = reduceFunction.reduce(current, oldValue);
@@ -161,15 +159,35 @@ public class AbstractReducingState<K, N, V> extends 
AbstractKeyedState<K, N, V>
             if (current != null) {
                 // create the target full-binary-key
                 setCurrentNamespace(target);
-                V targetValue = 
handleRequestSync(StateRequestType.REDUCING_GET, null);
+                V targetValue = getInternal();
 
                 if (targetValue != null) {
                     current = reduceFunction.reduce(current, targetValue);
                 }
-                handleRequestSync(StateRequestType.REDUCING_ADD, current);
+                updateInternal(current);
             }
         } catch (Exception e) {
             throw new RuntimeException("merge namespace fail.", e);
         }
     }
+
+    @Override
+    public StateFuture<V> asyncGetInternal() {
+        return handleRequest(StateRequestType.REDUCING_GET, null);
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(V valueToStore) {
+        return handleRequest(StateRequestType.REDUCING_ADD, valueToStore);
+    }
+
+    @Override
+    public V getInternal() {
+        return handleRequestSync(StateRequestType.REDUCING_GET, null);
+    }
+
+    @Override
+    public void updateInternal(V valueToStore) {
+        handleRequestSync(StateRequestType.REDUCING_ADD, valueToStore);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
index bae29db5502..64138e340dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java
@@ -34,6 +34,8 @@ import javax.annotation.Nonnull;
  * @param <UV> The type of the values that the map state can hold.
  */
 public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> {
+    /** The type of the user key in the state. */
+    @Nonnull private final TypeInformation<UK> userKeyTypeInfo;
 
     /** The serializer for the user key. */
     @Nonnull private final TypeSerializer<UK> userKeySerializer;
@@ -67,9 +69,15 @@ public class MapStateDescriptor<UK, UV> extends 
StateDescriptor<UV> {
             TypeInformation<UV> userValueTypeInfo,
             SerializerConfig serializerConfig) {
         super(stateId, userValueTypeInfo, serializerConfig);
+        this.userKeyTypeInfo = userKeyTypeInfo;
         this.userKeySerializer = 
userKeyTypeInfo.createSerializer(serializerConfig);
     }
 
+    @Nonnull
+    public TypeInformation<UK> getUserKeyType() {
+        return userKeyTypeInfo;
+    }
+
     @Nonnull
     public TypeSerializer<UK> getUserKeySerializer() {
         return userKeySerializer.duplicate();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java
index 1b7c90be71b..08c158e08e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java
@@ -45,4 +45,45 @@ public class AggregatingStateAdaptor<K, N, IN, ACC, OUT>
             throw new RuntimeException("Error while get value from raw 
AggregatingState", e);
         }
     }
+
+    @Override
+    public StateFuture<ACC> asyncGetInternal() {
+        try {
+            return 
StateFutureUtils.completedFuture(delegatedState.getInternal());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Error while get internal value from raw 
AggregatingState", e);
+        }
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) {
+        try {
+            delegatedState.updateInternal(valueToStore);
+            return StateFutureUtils.completedVoidFuture();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Error while update internal value to raw 
AggregatingState", e);
+        }
+    }
+
+    @Override
+    public ACC getInternal() {
+        try {
+            return delegatedState.getInternal();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Error while get internal value from raw 
AggregatingState", e);
+        }
+    }
+
+    @Override
+    public void updateInternal(ACC valueToStore) {
+        try {
+            delegatedState.updateInternal(valueToStore);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Error while update internal value to raw 
AggregatingState", e);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
index bf48415801f..28c6993c9c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 
 import javax.annotation.Nonnull;
 
@@ -75,6 +76,17 @@ public class AsyncKeyedStateBackendAdaptor<K> implements 
AsyncKeyedStateBackend<
             @Nonnull TypeSerializer<N> namespaceSerializer,
             @Nonnull StateDescriptor<SV> stateDesc)
             throws Exception {
+        return createStateInternal(defaultNamespace, namespaceSerializer, 
stateDesc);
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public <N, S extends InternalKeyedState, SV> S createStateInternal(
+            @Nonnull N defaultNamespace,
+            @Nonnull TypeSerializer<N> namespaceSerializer,
+            @Nonnull StateDescriptor<SV> stateDesc)
+            throws Exception {
         org.apache.flink.api.common.state.StateDescriptor rawStateDesc =
                 StateDescriptorUtils.transformFromV2ToV1(stateDesc);
         org.apache.flink.api.common.state.State rawState =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
index fc121a70da7..bf2f3f23a09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
@@ -35,7 +35,7 @@ public class CompleteStateIterator<T> implements 
StateIterator<T> {
     final Iterator<T> iterator;
     final boolean empty;
 
-    CompleteStateIterator(Iterable<T> iterable) {
+    public CompleteStateIterator(Iterable<T> iterable) {
         this.iterator = iterable.iterator();
         this.empty = !iterator.hasNext();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java
index 276cef760ff..37a0f4d98b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java
@@ -39,7 +39,44 @@ public class ReducingStateAdaptor<K, N, V> extends 
MergingStateAdaptor<K, N, V,
         try {
             return StateFutureUtils.completedFuture(delegatedState.get());
         } catch (Exception e) {
-            throw new RuntimeException("Error while get value from raw 
AggregatingState", e);
+            throw new RuntimeException("Error while get value from raw 
ReducingState", e);
+        }
+    }
+
+    @Override
+    public StateFuture<V> asyncGetInternal() {
+        try {
+            return 
StateFutureUtils.completedFuture(delegatedState.getInternal());
+        } catch (Exception e) {
+            throw new RuntimeException("Error while get value from raw 
ReducingState", e);
+        }
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(V valueToStore) {
+        try {
+            delegatedState.updateInternal(valueToStore);
+            return StateFutureUtils.completedVoidFuture();
+        } catch (Exception e) {
+            throw new RuntimeException("Error while update value to raw 
ReducingState", e);
+        }
+    }
+
+    @Override
+    public V getInternal() {
+        try {
+            return delegatedState.getInternal();
+        } catch (Exception e) {
+            throw new RuntimeException("Error while get internal value from 
raw ReducingState", e);
+        }
+    }
+
+    @Override
+    public void updateInternal(V valueToStore) {
+        try {
+            delegatedState.updateInternal(valueToStore);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while update internal value to 
raw ReducingState", e);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java
index 8b5d7b2dfa2..bc469825d85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java
@@ -29,4 +29,6 @@ import org.apache.flink.api.common.state.v2.AggregatingState;
  * @param <OUT> The type of the values that are returned from the state.
  */
 public interface InternalAggregatingState<K, N, IN, ACC, OUT>
-        extends InternalMergingState<K, N, IN, ACC, OUT, OUT>, 
AggregatingState<IN, OUT> {}
+        extends InternalMergingState<K, N, IN, ACC, OUT, OUT>,
+                AggregatingState<IN, OUT>,
+                InternalKeyedState<K, N, ACC> {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java
index 6b9b323d8fe..7e938c6fbe9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.v2.internal;
 
 import org.apache.flink.api.common.state.v2.AppendingState;
+import org.apache.flink.api.common.state.v2.StateFuture;
 
 /**
  * This class defines the internal interface for appending state.
@@ -30,4 +31,32 @@ import org.apache.flink.api.common.state.v2.AppendingState;
  * @param <SYNCOUT> Type of the value that can be retrieved from the state by 
synchronous interface.
  */
 public interface InternalAppendingState<K, N, IN, SV, OUT, SYNCOUT>
-        extends InternalKeyedState<K, N, SV>, AppendingState<IN, OUT, SYNCOUT> 
{}
+        extends InternalKeyedState<K, N, SV>, AppendingState<IN, OUT, SYNCOUT> 
{
+    /**
+     * Get internally stored value.
+     *
+     * @return internally stored value.
+     */
+    StateFuture<SV> asyncGetInternal();
+
+    /**
+     * Update internally stored value.
+     *
+     * @param valueToStore new value to store.
+     */
+    StateFuture<Void> asyncUpdateInternal(SV valueToStore);
+
+    /**
+     * Get internally stored value.
+     *
+     * @return internally stored value.
+     */
+    SV getInternal();
+
+    /**
+     * Update internally stored value.
+     *
+     * @param valueToStore new value to store.
+     */
+    void updateInternal(SV valueToStore);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java
new file mode 100644
index 00000000000..5c06a739844
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
+
+/**
+ * Base class for TTL logic wrappers of state objects. state V2 does not 
support
+ * FULL_STATE_SCAN_SNAPSHOT and INCREMENTAL_CLEANUP, only supports 
ROCKSDB_COMPACTION_FILTER.
+ * UpdateType#OnReadAndWrite is also not supported in state V2.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <SV> The type of values kept internally in state without TTL
+ * @param <TTLSV> The type of values kept internally in state with TTL
+ * @param <S> Type of originally wrapped state object
+ */
+abstract class AbstractTtlState<K, N, SV, TTLSV, S extends 
InternalKeyedState<K, N, TTLSV>>
+        extends AbstractTtlDecorator<S> implements InternalKeyedState<K, N, 
SV> {
+    /** This registered callback is to be called whenever state is accessed 
for read or write. */
+    protected AbstractTtlState(TtlStateContext<S, SV> ttlStateContext) {
+        super(ttlStateContext.original, ttlStateContext.config, 
ttlStateContext.timeProvider);
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        return original.asyncClear();
+    }
+
+    @Override
+    public void clear() {
+        original.clear();
+    }
+
+    @Override
+    public void setCurrentNamespace(N namespace) {
+        original.setCurrentNamespace(namespace);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java
new file mode 100644
index 00000000000..45f235758e5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * This class wraps aggregating function with TTL logic.
+ *
+ * @param <IN> The type of the values that are aggregated (input values)
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> The type of the aggregated result
+ */
+public class TtlAggregateFunction<IN, ACC, OUT>
+        extends AbstractTtlDecorator<AggregateFunction<IN, ACC, OUT>>
+        implements AggregateFunction<IN, TtlValue<ACC>, OUT> {
+
+    public TtlAggregateFunction(
+            AggregateFunction<IN, ACC, OUT> aggFunction,
+            StateTtlConfig config,
+            TtlTimeProvider timeProvider) {
+        super(aggFunction, config, timeProvider);
+    }
+
+    @Override
+    public TtlValue<ACC> createAccumulator() {
+        return wrapWithTs(original.createAccumulator());
+    }
+
+    @Override
+    public TtlValue<ACC> add(IN value, TtlValue<ACC> accumulator) {
+        ACC userAcc = getUnexpired(accumulator);
+        userAcc = userAcc == null ? original.createAccumulator() : userAcc;
+        return wrapWithTs(original.add(value, userAcc));
+    }
+
+    @Override
+    public OUT getResult(TtlValue<ACC> accumulator) {
+        ACC userAcc;
+        try {
+            userAcc = getElementWithTtlCheck(accumulator);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(
+                    "Failed to retrieve original internal aggregating state", 
e);
+        }
+        return userAcc == null ? null : original.getResult(userAcc);
+    }
+
+    @Override
+    public TtlValue<ACC> merge(TtlValue<ACC> a, TtlValue<ACC> b) {
+        ACC userA = getUnexpired(a);
+        ACC userB = getUnexpired(b);
+        if (userA != null && userB != null) {
+            return wrapWithTs(original.merge(userA, userB));
+        } else if (userA != null) {
+            return rewrapWithNewTs(a);
+        } else if (userB != null) {
+            return rewrapWithNewTs(b);
+        } else {
+            return null;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java
new file mode 100644
index 00000000000..78c88cc4dfe
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> Type of the value extracted from the state
+ */
+class TtlAggregatingState<K, N, IN, ACC, OUT>
+        extends AbstractTtlState<
+                K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, 
TtlValue<ACC>, OUT>>
+        implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+
+    TtlAggregatingState(
+            TtlStateContext<InternalAggregatingState<K, N, IN, TtlValue<ACC>, 
OUT>, ACC>
+                    ttlStateContext,
+            TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        super(ttlStateContext);
+    }
+
+    @Override
+    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> 
sources) {
+        return original.asyncMergeNamespaces(target, sources);
+    }
+
+    @Override
+    public void mergeNamespaces(N target, Collection<N> sources) {
+        original.mergeNamespaces(target, sources);
+    }
+
+    @Override
+    public StateFuture<OUT> asyncGet() {
+        return original.asyncGet();
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(IN value) {
+        return original.asyncAdd(value);
+    }
+
+    @Override
+    public OUT get() {
+        return original.get();
+    }
+
+    @Override
+    public void add(IN value) {
+        original.add(value);
+    }
+
+    @Override
+    public void clear() {
+        original.clear();
+    }
+
+    @Override
+    public StateFuture<ACC> asyncGetInternal() {
+        return original.asyncGetInternal().thenApply(ttlValue -> 
getElementWithTtlCheck(ttlValue));
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) {
+        return original.asyncUpdateInternal(wrapWithTs(valueToStore));
+    }
+
+    @Override
+    public ACC getInternal() {
+        TtlValue<ACC> ttlValue = original.getInternal();
+        return getElementWithTtlCheck(ttlValue);
+    }
+
+    @Override
+    public void updateInternal(ACC valueToStore) {
+        original.updateInternal(wrapWithTs(valueToStore));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java
new file mode 100644
index 00000000000..7cd52632866
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlUtils;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user entry value of state with TTL
+ */
+class TtlListState<K, N, T>
+        extends AbstractTtlState<K, N, T, TtlValue<T>, InternalListState<K, N, 
TtlValue<T>>>
+        implements InternalListState<K, N, T> {
+
+    protected TtlListState(
+            TtlStateContext<InternalListState<K, N, TtlValue<T>>, T> 
ttlStateContext) {
+        super(ttlStateContext);
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdate(List<T> values) {
+        Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+        return original.asyncUpdate(withTs(values));
+    }
+
+    @Override
+    public StateFuture<Void> asyncAddAll(List<T> values) {
+        Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+        return original.asyncAddAll(withTs(values));
+    }
+
+    @Override
+    public StateFuture<StateIterator<T>> asyncGet() {
+        // 1. The timestamp of elements in list state isn't updated when get 
even if updateTsOnRead
+        // is true.
+        // 2. we don't clear state here cause forst is LSM-tree based.
+        return original.asyncGet().thenApply(stateIter -> new 
AsyncIteratorWrapper(stateIter));
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(T value) {
+        return original.asyncAdd(value == null ? null : wrapWithTs(value));
+    }
+
+    @Override
+    public Iterable<T> get() {
+        Iterable<TtlValue<T>> ttlValue = original.get();
+        ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
+        final Iterable<TtlValue<T>> finalResult = ttlValue;
+        return () -> new IteratorWithCleanup(finalResult.iterator());
+    }
+
+    @Override
+    public void add(T value) {
+        original.add(value == null ? null : wrapWithTs(value));
+    }
+
+    @Override
+    public void update(List<T> values) {
+        Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+        original.update(withTs(values));
+    }
+
+    @Override
+    public void addAll(List<T> values) {
+        Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+        original.addAll(withTs(values));
+    }
+
+    private <E> List<E> collect(Iterable<E> iterable) {
+        if (iterable instanceof List) {
+            return (List<E>) iterable;
+        } else {
+            List<E> list = new ArrayList<>();
+            for (E element : iterable) {
+                list.add(element);
+            }
+            return list;
+        }
+    }
+
+    private List<TtlValue<T>> withTs(List<T> values) {
+        long currentTimestamp = timeProvider.currentTimestamp();
+        List<TtlValue<T>> withTs = new ArrayList<>(values.size());
+        for (T value : values) {
+            Preconditions.checkNotNull(value, "You cannot have null element in 
a ListState.");
+            withTs.add(TtlUtils.wrapWithTs(value, currentTimestamp));
+        }
+        return withTs;
+    }
+
+    private class IteratorWithCleanup implements Iterator<T> {
+        private final Iterator<TtlValue<T>> originalIterator;
+        private boolean anyUnexpired = false;
+        private boolean uncleared = true;
+        private T nextUnexpired = null;
+
+        private IteratorWithCleanup(Iterator<TtlValue<T>> ttlIterator) {
+            this.originalIterator = ttlIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            findNextUnexpired();
+            cleanupIfEmpty();
+            return nextUnexpired != null;
+        }
+
+        private void cleanupIfEmpty() {
+            boolean endOfIter = !originalIterator.hasNext() && nextUnexpired 
== null;
+            if (uncleared && !anyUnexpired && endOfIter) {
+                original.clear();
+                uncleared = false;
+            }
+        }
+
+        @Override
+        public T next() {
+            if (hasNext()) {
+                T result = nextUnexpired;
+                nextUnexpired = null;
+                return result;
+            }
+            throw new NoSuchElementException();
+        }
+
+        // Once a null element is encountered, the subsequent elements will no 
longer be returned.
+        private void findNextUnexpired() {
+            while (nextUnexpired == null && originalIterator.hasNext()) {
+                TtlValue<T> ttlValue = originalIterator.next();
+                if (ttlValue == null) {
+                    break;
+                }
+                boolean unexpired = !expired(ttlValue);
+                if (unexpired) {
+                    anyUnexpired = true;
+                }
+                if (unexpired || returnExpired) {
+                    nextUnexpired = ttlValue.getUserValue();
+                }
+            }
+        }
+    }
+
+    private class AsyncIteratorWrapper implements StateIterator<T> {
+
+        private final StateIterator<TtlValue<T>> originalIterator;
+
+        public AsyncIteratorWrapper(StateIterator<TtlValue<T>> 
originalIterator) {
+            this.originalIterator = originalIterator;
+        }
+
+        @Override
+        public <U> StateFuture<Collection<U>> onNext(
+                Function<T, StateFuture<? extends U>> iterating) {
+            Function<TtlValue<T>, StateFuture<? extends U>> ttlIterating =
+                    (item) -> {
+                        T element = getElementWithTtlCheck(item);
+                        if (element != null) {
+                            return iterating.apply(element);
+                        } else {
+                            return null;
+                        }
+                    };
+            return originalIterator.onNext(ttlIterating);
+        }
+
+        @Override
+        public StateFuture<Void> onNext(Consumer<T> iterating) {
+            Consumer<TtlValue<T>> ttlIterating =
+                    (item) -> {
+                        T element = getElementWithTtlCheck(item);
+                        if (element != null) {
+                            iterating.accept(element);
+                        }
+                    };
+            return originalIterator.onNext(ttlIterating);
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return originalIterator.isEmpty();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java
new file mode 100644
index 00000000000..15b5e751aea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlUtils;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalMapState;
+
+import javax.annotation.Nonnull;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <UK> Type of the user entry key of state with TTL
+ * @param <UV> Type of the user entry value of state with TTL
+ */
+class TtlMapState<K, N, UK, UV>
+        extends AbstractTtlState<K, N, UV, TtlValue<UV>, InternalMapState<K, 
N, UK, TtlValue<UV>>>
+        implements InternalMapState<K, N, UK, UV> {
+
+    protected TtlMapState(
+            TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, UV> 
ttlStateContext) {
+        super(ttlStateContext);
+    }
+
+    @Override
+    public void clear() {
+        original.clear();
+    }
+
+    @Override
+    public StateFuture<UV> asyncGet(UK key) {
+        return original.asyncGet(key).thenApply(ttlValue -> 
getElementWithTtlCheck(ttlValue));
+    }
+
+    @Override
+    public StateFuture<Void> asyncPut(UK key, UV value) {
+        return original.asyncPut(key, value == null ? null : 
wrapWithTs(value));
+    }
+
+    @Override
+    public StateFuture<Void> asyncPutAll(Map<UK, UV> map) {
+        Map<UK, TtlValue<UV>> withTs = new HashMap();
+        for (Map.Entry<UK, UV> entry : map.entrySet()) {
+            withTs.put(
+                    entry.getKey(), entry.getValue() == null ? null : 
wrapWithTs(entry.getValue()));
+        }
+        return original.asyncPutAll(withTs);
+    }
+
+    @Override
+    public StateFuture<Void> asyncRemove(UK key) {
+        return original.asyncRemove(key);
+    }
+
+    @Override
+    public StateFuture<Boolean> asyncContains(UK key) {
+        return original.asyncGet(key)
+                .thenApply(ttlValue -> getElementWithTtlCheck(ttlValue) != 
null);
+    }
+
+    @Override
+    public StateFuture<StateIterator<Map.Entry<UK, UV>>> asyncEntries() {
+        return original.asyncEntries().thenApply(iter -> new 
AsyncEntriesIterator<>(iter, e -> e));
+    }
+
+    @Override
+    public StateFuture<StateIterator<UK>> asyncKeys() {
+        return original.asyncEntries()
+                .thenApply(iter -> new AsyncEntriesIterator<>(iter, e -> 
e.getKey()));
+    }
+
+    @Override
+    public StateFuture<StateIterator<UV>> asyncValues() {
+        return original.asyncEntries()
+                .thenApply(iter -> new AsyncEntriesIterator<>(iter, e -> 
e.getValue()));
+    }
+
+    @Override
+    public StateFuture<Boolean> asyncIsEmpty() {
+        // the result may be wrong if state is expired.
+        return original.asyncIsEmpty();
+    }
+
+    @Override
+    public UV get(UK key) {
+        return getElementWithTtlCheck(original.get(key));
+    }
+
+    @Override
+    public void put(UK key, UV value) {
+        original.put(key, value == null ? null : wrapWithTs(value));
+    }
+
+    @Override
+    public void putAll(Map<UK, UV> map) {
+        Map<UK, TtlValue<UV>> withTs = new HashMap();
+        long currentTimestamp = timeProvider.currentTimestamp();
+        for (Map.Entry<UK, UV> entry : map.entrySet()) {
+            withTs.put(
+                    entry.getKey(),
+                    entry.getValue() == null
+                            ? null
+                            : TtlUtils.wrapWithTs(entry.getValue(), 
currentTimestamp));
+        }
+        original.putAll(withTs);
+    }
+
+    @Override
+    public void remove(UK key) {
+        original.remove(key);
+    }
+
+    @Override
+    public boolean contains(UK key) {
+        return getElementWithTtlCheck(original.get(key)) != null;
+    }
+
+    @Override
+    public Iterable<Map.Entry<UK, UV>> entries() {
+        return entries(e -> e);
+    }
+
+    @Override
+    public Iterable<UK> keys() {
+        return entries(e -> e.getKey());
+    }
+
+    @Override
+    public Iterable<UV> values() {
+        return entries(e -> e.getValue());
+    }
+
+    private <R> Iterable<R> entries(Function<Map.Entry<UK, UV>, R> 
resultMapper) {
+        Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
+        return () ->
+                new EntriesIterator<>(
+                        withTs == null ? Collections.emptyList() : withTs, 
resultMapper);
+    }
+
+    @Override
+    public Iterator<Map.Entry<UK, UV>> iterator() {
+        return entries().iterator();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        // todo: poor performance, if return `original.isEmpty()` directly, 
the result may be wrong.
+        return iterator().hasNext();
+    }
+
+    private class EntriesIterator<R> implements Iterator<R> {
+        private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator;
+        private final Function<Map.Entry<UK, UV>, R> resultMapper;
+        private Map.Entry<UK, UV> nextUnexpired = null;
+        private boolean rightAfterNextIsCalled = false;
+
+        private EntriesIterator(
+                @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
+                @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+            this.originalIterator = withTs.iterator();
+            this.resultMapper = resultMapper;
+        }
+
+        @Override
+        public boolean hasNext() {
+            rightAfterNextIsCalled = false;
+            while (nextUnexpired == null && originalIterator.hasNext()) {
+                Map.Entry<UK, TtlValue<UV>> ttlEntry = originalIterator.next();
+                UV value = getElementWithTtlCheck(ttlEntry.getValue());
+                nextUnexpired =
+                        value == null
+                                ? null
+                                : new 
AbstractMap.SimpleEntry<>(ttlEntry.getKey(), value);
+            }
+            return nextUnexpired != null;
+        }
+
+        @Override
+        public R next() {
+            if (hasNext()) {
+                rightAfterNextIsCalled = true;
+                R result = resultMapper.apply(nextUnexpired);
+                nextUnexpired = null;
+                return result;
+            }
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public void remove() {
+            if (rightAfterNextIsCalled) {
+                originalIterator.remove();
+            } else {
+                throw new IllegalStateException(
+                        "next() has not been called or hasNext() has been 
called afterwards,"
+                                + " remove() is supported only right after 
calling next()");
+            }
+        }
+    }
+
+    private class AsyncEntriesIterator<R> implements StateIterator<R> {
+        private final StateIterator<Map.Entry<UK, TtlValue<UV>>> 
originalIterator;
+        private final Function<Map.Entry<UK, UV>, R> resultMapper;
+
+        public AsyncEntriesIterator(
+                @Nonnull StateIterator<Map.Entry<UK, TtlValue<UV>>> 
originalIterator,
+                @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+            this.originalIterator = originalIterator;
+            this.resultMapper = resultMapper;
+        }
+
+        @Override
+        public <U> StateFuture<Collection<U>> onNext(
+                Function<R, StateFuture<? extends U>> iterating) {
+            Function<Map.Entry<UK, TtlValue<UV>>, StateFuture<? extends U>> 
ttlIterating =
+                    (item) -> {
+                        UV value = getElementWithTtlCheck(item.getValue());
+                        if (value == null) {
+                            return null;
+                        }
+                        R result =
+                                resultMapper.apply(
+                                        new 
AbstractMap.SimpleEntry<>(item.getKey(), value));
+                        return iterating.apply(result);
+                    };
+            return originalIterator.onNext(ttlIterating);
+        }
+
+        @Override
+        public StateFuture<Void> onNext(Consumer<R> iterating) {
+            Consumer<Map.Entry<UK, TtlValue<UV>>> ttlIterating =
+                    (item) -> {
+                        UV value = getElementWithTtlCheck(item.getValue());
+                        if (value == null) {
+                            return;
+                        }
+                        iterating.accept(
+                                resultMapper.apply(
+                                        new 
AbstractMap.SimpleEntry<>(item.getKey(), value)));
+                    };
+            return originalIterator.onNext(ttlIterating);
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return false;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java
new file mode 100644
index 00000000000..31549ecb99a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlReducingState<K, N, T>
+        extends AbstractTtlState<K, N, T, TtlValue<T>, 
InternalReducingState<K, N, TtlValue<T>>>
+        implements InternalReducingState<K, N, T> {
+
+    protected TtlReducingState(
+            TtlStateContext<InternalReducingState<K, N, TtlValue<T>>, T> 
ttlStateContext) {
+        super(ttlStateContext);
+    }
+
+    @Override
+    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> 
sources) {
+        return original.asyncMergeNamespaces(target, sources);
+    }
+
+    @Override
+    public void mergeNamespaces(N target, Collection<N> sources) {
+        original.mergeNamespaces(target, sources);
+    }
+
+    @Override
+    public StateFuture<T> asyncGet() {
+        return asyncGetInternal();
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(T value) {
+        return asyncUpdateInternal(value);
+    }
+
+    @Override
+    public T get() {
+        return getInternal();
+    }
+
+    @Override
+    public void add(T value) {
+        original.add(wrapWithTs(value));
+    }
+
+    @Override
+    public StateFuture<T> asyncGetInternal() {
+        return original.asyncGetInternal().thenApply(ttlValue -> 
getElementWithTtlCheck(ttlValue));
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(T valueToStore) {
+        return original.asyncUpdateInternal(wrapWithTs(valueToStore));
+    }
+
+    @Override
+    public T getInternal() {
+        TtlValue<T> ttlValue = original.getInternal();
+        return getElementWithTtlCheck(ttlValue);
+    }
+
+    @Override
+    public void updateInternal(T valueToStore) {
+        original.updateInternal(wrapWithTs(valueToStore));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
new file mode 100644
index 00000000000..1f3050734fb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlReduceFunction;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
+import org.apache.flink.runtime.state.v2.ListStateDescriptor;
+import org.apache.flink.runtime.state.v2.MapStateDescriptor;
+import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
+import org.apache.flink.runtime.state.v2.StateDescriptor;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS> {
+    public static <K, N, SV, TTLSV, S extends State, IS extends S>
+            IS createStateAndWrapWithTtlIfEnabled(
+                    N defaultNamespace,
+                    TypeSerializer<N> namespaceSerializer,
+                    StateDescriptor<SV> stateDesc,
+                    AsyncKeyedStateBackend<K> stateBackend,
+                    TtlTimeProvider timeProvider)
+                    throws Exception {
+        Preconditions.checkNotNull(namespaceSerializer);
+        Preconditions.checkNotNull(stateDesc);
+        Preconditions.checkNotNull(stateBackend);
+        Preconditions.checkNotNull(timeProvider);
+        if (stateDesc.getTtlConfig().isEnabled()) {
+            if 
(!stateDesc.getTtlConfig().getCleanupStrategies().inRocksdbCompactFilter()) {
+                throw new UnsupportedOperationException(
+                        "Only ROCKSDB_COMPACTION_FILTER strategy is supported 
in state V2.");
+            }
+            if (stateDesc
+                    .getTtlConfig()
+                    .getUpdateType()
+                    .equals(StateTtlConfig.UpdateType.OnReadAndWrite)) {
+                throw new UnsupportedOperationException(
+                        "OnReadAndWrite update type is not supported in state 
V2.");
+            }
+        }
+        return stateDesc.getTtlConfig().isEnabled()
+                ? new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
+                                defaultNamespace,
+                                namespaceSerializer,
+                                stateDesc,
+                                stateBackend,
+                                timeProvider)
+                        .createState()
+                : stateBackend.createStateInternal(
+                        defaultNamespace, namespaceSerializer, stateDesc);
+    }
+
+    public static boolean isTtlStateSerializer(TypeSerializer<?> 
typeSerializer) {
+        //  element's serializer in state descriptor.
+        boolean ttlSerializer = typeSerializer instanceof TtlSerializer;
+        return ttlSerializer;
+    }
+
+    private final Map<StateDescriptor.Type, SupplierWithException<IS, 
Exception>> stateFactories;
+    private N defaultNamespace;
+    private TypeSerializer<N> namespaceSerializer;
+    private StateDescriptor<SV> stateDesc;
+    private AsyncKeyedStateBackend<K> stateBackend;
+    private TtlTimeProvider timeProvider;
+
+    @Nonnull private final StateTtlConfig ttlConfig;
+    private final long ttl;
+
+    private TtlStateFactory(
+            N defaultNamespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<SV> stateDesc,
+            AsyncKeyedStateBackend<K> stateBackend,
+            TtlTimeProvider timeProvider) {
+        this.defaultNamespace = defaultNamespace;
+        this.namespaceSerializer = namespaceSerializer;
+        this.stateDesc = stateDesc;
+        this.stateBackend = stateBackend;
+        this.ttlConfig = stateDesc.getTtlConfig();
+        this.timeProvider = timeProvider;
+        this.ttl = ttlConfig.getTimeToLive().toMillis();
+        this.stateFactories = createStateFactories();
+    }
+
+    private Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> 
createStateFactories() {
+        return Stream.of(
+                        Tuple2.of(
+                                StateDescriptor.Type.VALUE,
+                                (SupplierWithException<IS, Exception>) 
this::createValueState),
+                        Tuple2.of(
+                                StateDescriptor.Type.LIST,
+                                (SupplierWithException<IS, Exception>) 
this::createListState),
+                        Tuple2.of(
+                                StateDescriptor.Type.MAP,
+                                (SupplierWithException<IS, Exception>) 
this::createMapState),
+                        Tuple2.of(
+                                StateDescriptor.Type.REDUCING,
+                                (SupplierWithException<IS, Exception>) 
this::createReducingState),
+                        Tuple2.of(
+                                StateDescriptor.Type.AGGREGATING,
+                                (SupplierWithException<IS, Exception>)
+                                        this::createAggregatingState))
+                .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+    }
+
+    @SuppressWarnings("unchecked")
+    private IS createState() throws Exception {
+        SupplierWithException<IS, Exception> stateFactory = 
stateFactories.get(stateDesc.getType());
+        if (stateFactory == null) {
+            String message =
+                    String.format(
+                            "State type: %s is not supported by %s",
+                            stateDesc.getType(), TtlStateFactory.class);
+            throw new FlinkRuntimeException(message);
+        }
+        return stateFactory.get();
+    }
+
+    @SuppressWarnings("unchecked")
+    private IS createValueState() throws Exception {
+        ValueStateDescriptor<TtlValue<SV>> ttlDescriptor =
+                stateDesc.getSerializer() instanceof TtlSerializer
+                        ? (ValueStateDescriptor<TtlValue<SV>>) stateDesc
+                        : new ValueStateDescriptor<>(
+                                stateDesc.getStateId(),
+                                new TtlTypeInformation<>(
+                                        new TtlSerializer<>(
+                                                LongSerializer.INSTANCE,
+                                                stateDesc.getSerializer())));
+        return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> IS createListState() throws Exception {
+        ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) 
stateDesc;
+        ListStateDescriptor<TtlValue<T>> ttlDescriptor =
+                listStateDesc.getSerializer() instanceof TtlSerializer
+                        ? (ListStateDescriptor<TtlValue<T>>) stateDesc
+                        : new ListStateDescriptor<>(
+                                stateDesc.getStateId(),
+                                new TtlTypeInformation<>(
+                                        new TtlSerializer<>(
+                                                LongSerializer.INSTANCE,
+                                                
listStateDesc.getSerializer())));
+        return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <UK, UV> IS createMapState() throws Exception {
+        MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) 
stateDesc;
+        MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor =
+                mapStateDesc.getSerializer() instanceof TtlSerializer
+                        ? (MapStateDescriptor<UK, TtlValue<UV>>) stateDesc
+                        : new MapStateDescriptor<>(
+                                stateDesc.getStateId(),
+                                mapStateDesc.getUserKeyType(),
+                                new TtlTypeInformation<>(
+                                        new TtlSerializer<>(
+                                                LongSerializer.INSTANCE,
+                                                
mapStateDesc.getSerializer())));
+        return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
+    }
+
+    @SuppressWarnings("unchecked")
+    private IS createReducingState() throws Exception {
+        ReducingStateDescriptor<SV> reducingStateDesc = 
(ReducingStateDescriptor<SV>) stateDesc;
+        ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor =
+                stateDesc.getSerializer() instanceof TtlSerializer
+                        ? (ReducingStateDescriptor<TtlValue<SV>>) stateDesc
+                        : new ReducingStateDescriptor<>(
+                                stateDesc.getStateId(),
+                                new TtlReduceFunction<>(
+                                        reducingStateDesc.getReduceFunction(),
+                                        ttlConfig,
+                                        timeProvider),
+                                new TtlTypeInformation<>(
+                                        new TtlSerializer<>(
+                                                LongSerializer.INSTANCE,
+                                                stateDesc.getSerializer())));
+        return (IS) new 
TtlReducingState<>(createTtlStateContext(ttlDescriptor));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <IN, OUT> IS createAggregatingState() throws Exception {
+        AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
+                (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
+        TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction =
+                new TtlAggregateFunction<>(
+                        aggregatingStateDescriptor.getAggregateFunction(), 
ttlConfig, timeProvider);
+        AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor =
+                stateDesc.getSerializer() instanceof TtlSerializer
+                        ? (AggregatingStateDescriptor<IN, TtlValue<SV>, OUT>) 
stateDesc
+                        : new AggregatingStateDescriptor<>(
+                                stateDesc.getStateId(),
+                                ttlAggregateFunction,
+                                new TtlTypeInformation<>(
+                                        new TtlSerializer<>(
+                                                LongSerializer.INSTANCE,
+                                                stateDesc.getSerializer())));
+        return (IS)
+                new TtlAggregatingState<>(
+                        createTtlStateContext(ttlDescriptor), 
ttlAggregateFunction);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <OIS extends State, TTLS extends State, V, TTLV>
+            TtlStateContext<OIS, V> 
createTtlStateContext(StateDescriptor<TTLV> ttlDescriptor)
+                    throws Exception {
+
+        ttlDescriptor.enableTimeToLive(
+                stateDesc.getTtlConfig()); // also used by RocksDB backend for 
TTL compaction filter
+        // config
+        OIS originalState =
+                (OIS)
+                        stateBackend.createStateInternal(
+                                defaultNamespace, namespaceSerializer, 
ttlDescriptor);
+        return new TtlStateContext<>(
+                originalState,
+                ttlConfig,
+                timeProvider,
+                (TypeSerializer<V>) stateDesc.getSerializer(),
+                () -> {});
+    }
+
+    public static class TtlTypeInformation<T> extends 
TypeInformation<TtlValue<T>> {
+
+        Class<?> typeClass;
+
+        TypeSerializer<TtlValue<T>> typeSerializer;
+
+        TtlTypeInformation(TypeSerializer<TtlValue<T>> typeSerializer) {
+            this.typeSerializer = typeSerializer;
+            typeClass = TtlValue.class;
+        }
+
+        @Override
+        public boolean isBasicType() {
+            return false;
+        }
+
+        @Override
+        public boolean isTupleType() {
+            return false;
+        }
+
+        @Override
+        public int getArity() {
+            return 2;
+        }
+
+        @Override
+        public int getTotalFields() {
+            return 2;
+        }
+
+        @Override
+        public Class<TtlValue<T>> getTypeClass() {
+            return (Class<TtlValue<T>>) typeClass;
+        }
+
+        @Override
+        public boolean isKeyType() {
+            return false;
+        }
+
+        @Override
+        public TypeSerializer<TtlValue<T>> createSerializer(SerializerConfig 
config) {
+            return typeSerializer;
+        }
+
+        @Override
+        public String toString() {
+            return "TtlTypeInformation{}";
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            return typeSerializer.equals(((TtlTypeInformation<T>) 
obj).typeSerializer);
+        }
+
+        @Override
+        public int hashCode() {
+            return typeSerializer.hashCode();
+        }
+
+        @Override
+        public boolean canEqual(Object obj) {
+            return obj instanceof TtlTypeInformation;
+        }
+    }
+
+    /**
+     * Serializer for user state value with TTL. Visibility is public for 
usage with external tools.
+     */
+    public static class TtlSerializer<T> extends 
CompositeSerializer<TtlValue<T>> {
+        private static final long serialVersionUID = 131020282727167064L;
+
+        @SuppressWarnings("WeakerAccess")
+        public TtlSerializer(
+                TypeSerializer<Long> timestampSerializer, TypeSerializer<T> 
userValueSerializer) {
+            super(true, timestampSerializer, userValueSerializer);
+            checkArgument(!(userValueSerializer instanceof TtlSerializer));
+        }
+
+        @SuppressWarnings("WeakerAccess")
+        public TtlSerializer(
+                PrecomputedParameters precomputed, TypeSerializer<?>... 
fieldSerializers) {
+            super(precomputed, fieldSerializers);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public TtlValue<T> createInstance(@Nonnull Object... values) {
+            Preconditions.checkArgument(values.length == 2);
+            return new TtlValue<>((T) values[1], (long) values[0]);
+        }
+
+        @Override
+        protected void setField(@Nonnull TtlValue<T> v, int index, Object 
fieldValue) {
+            throw new UnsupportedOperationException("TtlValue is immutable");
+        }
+
+        @Override
+        protected Object getField(@Nonnull TtlValue<T> v, int index) {
+            return index == 0 ? v.getLastAccessTimestamp() : v.getUserValue();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
+                PrecomputedParameters precomputed, TypeSerializer<?>... 
originalSerializers) {
+            Preconditions.checkNotNull(originalSerializers);
+            Preconditions.checkArgument(originalSerializers.length == 2);
+            return new TtlSerializer<>(precomputed, originalSerializers);
+        }
+
+        @SuppressWarnings("unchecked")
+        TypeSerializer<Long> getTimestampSerializer() {
+            return (TypeSerializer<Long>) (TypeSerializer<?>) 
fieldSerializers[0];
+        }
+
+        @SuppressWarnings("unchecked")
+        TypeSerializer<T> getValueSerializer() {
+            return (TypeSerializer<T>) fieldSerializers[1];
+        }
+
+        @Override
+        public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
+            return new TtlSerializerSnapshot<>(this);
+        }
+    }
+
+    /** A {@link TypeSerializerSnapshot} for TtlSerializer. */
+    public static final class TtlSerializerSnapshot<T>
+            extends CompositeTypeSerializerSnapshot<TtlValue<T>, 
TtlSerializer<T>> {
+
+        private static final int VERSION = 2;
+
+        @SuppressWarnings({"WeakerAccess", "unused"})
+        public TtlSerializerSnapshot() {}
+
+        TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
+            super(serializerInstance);
+        }
+
+        @Override
+        protected int getCurrentOuterSnapshotVersion() {
+            return VERSION;
+        }
+
+        @Override
+        protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> 
outerSerializer) {
+            return new TypeSerializer[] {
+                outerSerializer.getTimestampSerializer(), 
outerSerializer.getValueSerializer()
+            };
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(
+                TypeSerializer<?>[] nestedSerializers) {
+            TypeSerializer<Long> timestampSerializer = (TypeSerializer<Long>) 
nestedSerializers[0];
+            TypeSerializer<T> valueSerializer = (TypeSerializer<T>) 
nestedSerializers[1];
+
+            return new TtlSerializer<>(timestampSerializer, valueSerializer);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java
new file mode 100644
index 00000000000..b5878c7a9ff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2.ttl;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.state.ttl.TtlStateContext;
+import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.internal.InternalValueState;
+
+/**
+ * This class wraps value state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlValueState<K, N, T>
+        extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, 
N, TtlValue<T>>>
+        implements InternalValueState<K, N, T> {
+
+    protected TtlValueState(
+            TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> 
ttlStateContext) {
+        super(ttlStateContext);
+    }
+
+    @Override
+    public StateFuture<T> asyncValue() {
+        return original.asyncValue().thenApply((ttlValue) -> 
getElementWithTtlCheck(ttlValue));
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdate(T value) {
+        return original.asyncUpdate(value == null ? null : wrapWithTs(value));
+    }
+
+    @Override
+    public T value() {
+        TtlValue<T> ttlValue = original.value();
+        return getElementWithTtlCheck(ttlValue);
+    }
+
+    @Override
+    public void update(T value) {
+        original.update(value == null ? null : wrapWithTs(value));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
index ff01605809d..eca13bc180f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
@@ -140,6 +141,16 @@ public class StateBackendTestUtils {
             return (S) innerStateSupplier.get();
         }
 
+        @Nonnull
+        @Override
+        public <N, S extends InternalKeyedState, SV> S createStateInternal(
+                @Nonnull N defaultNamespace,
+                @Nonnull TypeSerializer<N> namespaceSerializer,
+                @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> 
stateDesc)
+                throws Exception {
+            return (S) innerStateSupplier.get();
+        }
+
         @Nonnull
         @Override
         public StateExecutor createStateExecutor() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
index bea212db17c..0426d43e519 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
@@ -233,14 +233,16 @@ class AbstractAggregatingStateTest extends 
AbstractKeyedStateTestBase {
                 String key = (String) stateRequest.getRecordContext().getKey();
                 String namespace = (String) stateRequest.getNamespace();
                 if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_ADD) {
-                    hashMap.put(Tuple2.of(key, namespace), (Integer) 
stateRequest.getPayload());
-                    stateRequest.getFuture().complete(null);
+                    if (stateRequest.getPayload() == null) {
+                        hashMap.remove(Tuple2.of(key, namespace));
+                        stateRequest.getFuture().complete(null);
+                    } else {
+                        hashMap.put(Tuple2.of(key, namespace), (Integer) 
stateRequest.getPayload());
+                        stateRequest.getFuture().complete(null);
+                    }
                 } else if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_GET) {
                     Integer val = hashMap.get(Tuple2.of(key, namespace));
                     stateRequest.getFuture().complete(val);
-                } else if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_REMOVE) {
-                    hashMap.remove(Tuple2.of(key, namespace));
-                    stateRequest.getFuture().complete(null);
                 } else {
                     throw new UnsupportedOperationException("Unsupported 
type");
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index bb09983de6f..56ae6c13470 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -182,6 +183,16 @@ public class AbstractKeyedStateTestBase {
                     return null;
                 }
 
+                @Nonnull
+                @Override
+                public <N, S extends InternalKeyedState, SV> S 
createStateInternal(
+                        @Nonnull N defaultNamespace,
+                        @Nonnull TypeSerializer<N> namespaceSerializer,
+                        @Nonnull StateDescriptor<SV> stateDesc)
+                        throws Exception {
+                    return null;
+                }
+
                 @Override
                 public StateExecutor createStateExecutor() {
                     return new TestStateExecutor();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
index ac0c6785061..1c8bfa276f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
@@ -163,13 +163,13 @@ public class AbstractReducingStateTest extends 
AbstractKeyedStateTestBase {
                 } else if (request.getRequestType() == 
StateRequestType.REDUCING_ADD) {
                     String key = (String) request.getRecordContext().getKey();
                     String namespace = (String) request.getNamespace();
-                    hashMap.put(Tuple2.of(key, namespace), (Integer) 
request.getPayload());
-                    request.getFuture().complete(null);
-                } else if (request.getRequestType() == 
StateRequestType.REDUCING_REMOVE) {
-                    String key = (String) request.getRecordContext().getKey();
-                    String namespace = (String) request.getNamespace();
-                    hashMap.remove(Tuple2.of(key, namespace));
-                    request.getFuture().complete(null);
+                    if (request.getPayload() == null) {
+                        hashMap.remove(Tuple2.of(key, namespace));
+                        request.getFuture().complete(null);
+                    } else {
+                        hashMap.put(Tuple2.of(key, namespace), (Integer) 
request.getPayload());
+                        request.getFuture().complete(null);
+                    }
                 } else {
                     throw new UnsupportedOperationException("Unsupported 
request type");
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
index b28ca4d7fa2..321e57efed1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.state.v2;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.state.StateFutureImpl;
 import org.apache.flink.metrics.MetricGroup;
@@ -59,6 +62,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -405,6 +409,64 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
         }
     }
 
+    void testValueStateWorkWithTtl() throws Exception {
+        TestAsyncFrameworkExceptionHandler testExceptionHandler =
+                new TestAsyncFrameworkExceptionHandler();
+        AsyncKeyedStateBackend<Long> backend =
+                createAsyncKeyedBackend(LongSerializer.INSTANCE, 128, env);
+        AsyncExecutionController<Long> aec =
+                new AsyncExecutionController<>(
+                        new SyncMailboxExecutor(),
+                        testExceptionHandler,
+                        backend.createStateExecutor(),
+                        128,
+                        1,
+                        -1,
+                        1,
+                        null);
+        backend.setup(aec);
+        try {
+            ValueStateDescriptor<Long> kvId =
+                    new ValueStateDescriptor<>("id", 
TypeInformation.of(Long.class));
+            
kvId.enableTimeToLive(StateTtlConfig.newBuilder(Duration.ofSeconds(1)).build());
+
+            ValueState<Long> state =
+                    backend.createState(
+                            VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+            RecordContext recordContext = aec.buildContext("record-1", 1L);
+            recordContext.retain();
+            aec.setCurrentContext(recordContext);
+            state.update(1L);
+            assertThat(state.value()).isEqualTo(1L);
+            Thread.sleep(1000);
+            assertThat(state.value()).isNull();
+            recordContext.release();
+
+            RecordContext recordContext1 = aec.buildContext("record-2", 2L);
+            aec.setCurrentContext(recordContext1);
+            state.asyncUpdate(2L)
+                    .thenAccept(
+                            (val) -> {
+                                state.asyncValue()
+                                        .thenAccept(
+                                                (val1) -> {
+                                                    
assertThat(val1).isEqualTo(2);
+                                                    Thread.sleep(1000);
+                                                    state.asyncValue()
+                                                            .thenAccept(
+                                                                    (val2) -> {
+                                                                        
assertThat(val2).isNull();
+                                                                    });
+                                                });
+                            });
+            Thread.sleep(3000);
+            recordContext1.release();
+        } finally {
+            IOUtils.closeQuietly(backend);
+            backend.dispose();
+        }
+    }
+
     static class TestAsyncFrameworkExceptionHandler
             implements StateFutureImpl.AsyncFrameworkExceptionHandler {
         String message = null;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
index 0843317cc83..abec5d6c83c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java
@@ -148,16 +148,13 @@ public class ForStAggregatingState<K, N, IN, ACC, OUT>
     public ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?, 
?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_ADD
-                        || stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_REMOVE
                         || stateRequest.getRequestType() == 
StateRequestType.CLEAR);
         ContextKey<K, N> contextKey =
                 new ContextKey<>(
                         (RecordContext<K>) stateRequest.getRecordContext(),
                         (N) stateRequest.getNamespace());
         ACC aggregate =
-                (stateRequest.getRequestType() == StateRequestType.CLEAR
-                                || stateRequest.getRequestType()
-                                        == StateRequestType.AGGREGATING_REMOVE)
+                stateRequest.getRequestType() == StateRequestType.CLEAR
                         ? null
                         : (ACC) stateRequest.getPayload();
         return ForStDBPutRequest.of(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java
index 4e39492ae61..a1bc9ef2f92 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java
@@ -27,10 +27,10 @@ import 
org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlUtils;
 import org.apache.flink.runtime.state.ttl.TtlValue;
+import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -92,8 +92,26 @@ public class ForStDBTtlCompactFiltersManager {
         if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
             RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase =
                     (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
-            if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(
-                    kvMetaInfoBase.getStateSerializer())) {
+            if 
(org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer
+                    
.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) {
+                createAndSetCompactFilterFactory(metaInfoBase.getName(), 
options);
+            }
+        }
+    }
+
+    public void setAndRegisterCompactFilterIfStateTtlV2(
+            @Nonnull RegisteredStateMetaInfoBase metaInfoBase,
+            @Nonnull ColumnFamilyOptions options) {
+
+        if (metaInfoBase
+                instanceof
+                
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo) {
+            
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo
+                    kvMetaInfoBase =
+                            (org.apache.flink.runtime.state.v2
+                                            
.RegisteredKeyValueStateBackendMetaInfo)
+                                    metaInfoBase;
+            if 
(TtlStateFactory.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) {
                 createAndSetCompactFilterFactory(metaInfoBase.getName(), 
options);
             }
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 4d3c82acc4a..61c62956313 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -47,12 +47,15 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.runtime.state.v2.ListStateDescriptor;
 import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
 import 
org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
+import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
 import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
@@ -95,6 +98,8 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
     /** The key groups which this state backend is responsible for. */
     private final KeyGroupRange keyGroupRange;
 
+    protected final TtlTimeProvider ttlTimeProvider;
+
     /** The key serializer. */
     protected final TypeSerializer<K> keySerializer;
 
@@ -166,6 +171,8 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
     @GuardedBy("lock")
     private boolean disposed = false;
 
+    private final ForStDBTtlCompactFiltersManager ttlCompactFiltersManager;
+
     public ForStKeyedStateBackend(
             UUID backendUID,
             ForStResourceContainer optionsContainer,
@@ -183,7 +190,9 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             PriorityQueueSetFactory priorityQueueFactory,
             CloseableRegistry cancelStreamRegistry,
             ForStNativeMetricMonitor nativeMetricMonitor,
-            InternalKeyContext<K> keyContext) {
+            InternalKeyContext<K> keyContext,
+            TtlTimeProvider ttlTimeProvider,
+            ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) {
         this.backendUID = backendUID;
         this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
@@ -199,6 +208,8 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
         this.snapshotStrategy = snapshotStrategy;
         this.cancelStreamRegistry = cancelStreamRegistry;
         this.nativeMetricMonitor = nativeMetricMonitor;
+        this.ttlTimeProvider = ttlTimeProvider;
+        this.ttlCompactFiltersManager = ttlCompactFiltersManager;
         this.managedStateExecutors = new HashSet<>(1);
         this.priorityQueueFactory = priorityQueueFactory;
         if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) {
@@ -226,6 +237,18 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             @Nonnull TypeSerializer<N> namespaceSerializer,
             @Nonnull StateDescriptor<SV> stateDesc)
             throws Exception {
+        return TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
+                defaultNamespace, namespaceSerializer, stateDesc, this, 
ttlTimeProvider);
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends InternalKeyedState, SV> S createStateInternal(
+            @Nonnull N defaultNamespace,
+            @Nonnull TypeSerializer<N> namespaceSerializer,
+            @Nonnull StateDescriptor<SV> stateDesc)
+            throws Exception {
         Preconditions.checkNotNull(
                 stateRequestHandler,
                 "A non-null stateRequestHandler must be setup before 
createState");
@@ -247,6 +270,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
                                 namespaceSerializer::duplicate,
                                 valueSerializerView,
                                 valueDeserializerView);
+
             case LIST:
                 return (S)
                         new ForStListState<>(
@@ -335,8 +359,12 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
                             
StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());
 
             newStateInfo =
-                    ForStOperationUtils.createStateInfo(
-                            newMetaInfo, db, columnFamilyOptionsFactory);
+                    ForStOperationUtils.createAsyncStateInfo(
+                            newMetaInfo,
+                            db,
+                            columnFamilyOptionsFactory,
+                            ttlCompactFiltersManager,
+                            optionsContainer.getWriteBufferManagerCapacity());
             ForStOperationUtils.registerKvStateInformation(
                     this.kvStateInformation,
                     this.nativeMetricMonitor,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index db57bfc20d0..7568dd637f8 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateBackendBuilder;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
 import 
org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation;
 import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation;
@@ -101,6 +102,7 @@ public class ForStKeyedStateBackendBuilder<K>
 
     private final int numberOfKeyGroups;
     private final KeyGroupRange keyGroupRange;
+    private final TtlTimeProvider ttlTimeProvider;
 
     private final Collection<KeyedStateHandle> restoreStateHandles;
 
@@ -128,6 +130,7 @@ public class ForStKeyedStateBackendBuilder<K>
             int numberOfKeyGroups,
             KeyGroupRange keyGroupRange,
             ForStPriorityQueueConfig priorityQueueConfig,
+            TtlTimeProvider ttlTimeProvider,
             MetricGroup metricGroup,
             StateBackend.CustomInitializationMetrics 
customInitializationMetrics,
             @Nonnull Collection<KeyedStateHandle> stateHandles,
@@ -141,6 +144,7 @@ public class ForStKeyedStateBackendBuilder<K>
         this.numberOfKeyGroups = numberOfKeyGroups;
         this.keyGroupRange = keyGroupRange;
         this.priorityQueueConfig = priorityQueueConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
         this.metricGroup = metricGroup;
         this.customInitializationMetrics = customInitializationMetrics;
         this.restoreStateHandles = stateHandles;
@@ -172,6 +176,12 @@ public class ForStKeyedStateBackendBuilder<K>
         LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates =
                 new LinkedHashMap<>();
 
+        ForStDBTtlCompactFiltersManager ttlCompactFiltersManager =
+                new ForStDBTtlCompactFiltersManager(
+                        ttlTimeProvider,
+                        optionsContainer.getQueryTimeAfterNumEntries(),
+                        optionsContainer.getPeriodicCompactionTime());
+
         RocksDB db = null;
         ForStRestoreOperation restoreOperation = null;
         // Number of bytes required to prefix the key groups.
@@ -277,7 +287,9 @@ public class ForStKeyedStateBackendBuilder<K>
                 priorityQueueFactory,
                 cancelStreamRegistryForBackend,
                 nativeMetricMonitor,
-                keyContext);
+                keyContext,
+                ttlTimeProvider,
+                ttlCompactFiltersManager);
     }
 
     private ForStRestoreOperation getForStRestoreOperation(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java
index f852a0bb8de..f69bb7ec73a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
 import org.apache.flink.state.forst.sync.ForStIteratorWrapper;
@@ -277,7 +278,13 @@ public class ForStOperationUtils {
                 createColumnFamilyOptions(columnFamilyOptionsFactory, 
metaInfoBase.getName());
 
         if (ttlCompactFiltersManager != null) {
-            
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, 
options);
+            if (metaInfoBase instanceof 
RegisteredKeyValueStateBackendMetaInfo) {
+                
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
+                        metaInfoBase, options);
+            } else {
+                
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
+                        metaInfoBase, options);
+            }
         }
 
         if (writeBufferManagerCapacity != null) {
@@ -378,6 +385,28 @@ public class ForStOperationUtils {
         return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
     }
 
+    public static ForStKvStateInfo createAsyncStateInfo(
+            RegisteredStateMetaInfoBase metaInfoBase,
+            RocksDB db,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            @Nullable ForStDBTtlCompactFiltersManager ttlCompactFiltersManager,
+            @Nullable Long writeBufferManagerCapacity) {
+
+        ColumnFamilyDescriptor columnFamilyDescriptor =
+                createColumnFamilyDescriptor(
+                        metaInfoBase,
+                        columnFamilyOptionsFactory,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
+        try {
+            ColumnFamilyHandle columnFamilyHandle = 
createColumnFamily(columnFamilyDescriptor, db);
+            return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
+        } catch (Exception ex) {
+            IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
+            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", ex);
+        }
+    }
+
     private static void throwExceptionIfPathLengthExceededOnWindows(String 
path, Exception cause)
             throws IOException {
         // max directory path length on Windows is 247.
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
index 5043b483588..da6e1dfbb7c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
@@ -139,15 +139,13 @@ public class ForStReducingState<K, N, V> extends 
AbstractReducingState<K, N, V>
     public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?, 
?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.REDUCING_ADD
-                        || stateRequest.getRequestType() == 
StateRequestType.REDUCING_REMOVE
                         || stateRequest.getRequestType() == 
StateRequestType.CLEAR);
         ContextKey<K, N> contextKey =
                 new ContextKey<>(
                         (RecordContext<K>) stateRequest.getRecordContext(),
                         (N) stateRequest.getNamespace());
         V value =
-                (stateRequest.getRequestType() == 
StateRequestType.REDUCING_REMOVE
-                                || stateRequest.getRequestType() == 
StateRequestType.CLEAR)
+                stateRequest.getRequestType() == StateRequestType.CLEAR
                         ? null // "Delete(key)" is equivalent to "Put(key, 
null)"
                         : (V) stateRequest.getPayload();
         return ForStDBPutRequest.of(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 6e61ecd3920..0826b29ca21 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -363,6 +363,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                                 parameters.getNumberOfKeyGroups(),
                                 parameters.getKeyGroupRange(),
                                 priorityQueueConfig,
+                                parameters.getTtlTimeProvider(),
                                 parameters.getMetricGroup(),
                                 parameters.getCustomInitializationMetrics(),
                                 parameters.getStateHandles(),
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
index db768bcc18f..b9d08f504d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
 import org.apache.flink.table.dataview.ListViewTypeInfo;
 import org.apache.flink.table.dataview.MapViewTypeInfo;
 import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
@@ -82,7 +83,8 @@ public class TypeInfoTestCoverageTest extends TestLogger {
                         BigDecimalTypeInfo.class.getName(),
                         DecimalDataTypeInfo.class.getName(),
                         GenericRecordAvroTypeInfo.class.getName(),
-                        AvroTypeInfo.class.getName());
+                        AvroTypeInfo.class.getName(),
+                        TtlStateFactory.TtlTypeInformation.class.getName());
 
         // check if a test exists for each type information
         for (Class<? extends TypeInformation> typeInfo : typeInfos) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index f4d1b53a486..494ee2e827e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -145,6 +145,8 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         CoGroupedStreams.UnionSerializer.class.getName(),
                         TtlStateFactory.TtlSerializer.class.getName(),
                         TtlAwareSerializer.class.getName(),
+                        
org.apache.flink.runtime.state.v2.ttl.TtlStateFactory.TtlSerializer.class
+                                .getName(),
                         TimeWindow.Serializer.class.getName(),
                         
InternalTimersSnapshotReaderWriters.LegacyTimerSerializer.class.getName(),
                         
TwoPhaseCommitSinkFunction.StateSerializer.class.getName(),

Reply via email to