This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2568dee63138899cb80982a9659ab25f0d38c2c
Author: Xiangyu Feng <[email protected]>
AuthorDate: Thu Feb 6 14:00:54 2025 +0800

    [FLINK-35780][state] Support state migration between disabling and enabling 
state ttl in RocksDBKeyedStateBackend (#25035)
---
 .../runtime/state/StateSerializerProvider.java     | 22 ++++--
 .../runtime/state/heap/HeapKeyedStateBackend.java  | 12 +++
 .../state/StateBackendMigrationTestBase.java       | 15 ++--
 .../runtime/state/ttl/StateBackendTestContext.java |  6 +-
 .../flink/runtime/state/ttl/TtlStateTestBase.java  |  4 +-
 .../ChangelogStateBackendMigrationTest.java        | 14 ++++
 .../flink/state/rocksdb/AbstractRocksDBState.java  | 24 +++++-
 .../state/rocksdb/RocksDBAggregatingState.java     |  3 +-
 .../state/rocksdb/RocksDBKeyedStateBackend.java    | 72 ++++++++++++-----
 .../flink/state/rocksdb/RocksDBListState.java      | 39 +++++----
 .../flink/state/rocksdb/RocksDBMapState.java       | 37 ++++++---
 .../flink/state/rocksdb/RocksDBOperationUtils.java |  2 +-
 .../flink/state/rocksdb/RocksDBReducingState.java  |  3 +-
 .../flink/state/rocksdb/RocksDBValueState.java     |  3 +-
 .../ttl/RocksDbTtlCompactFiltersManager.java       |  5 +-
 .../EmbeddedRocksDBStateBackendMigrationTest.java  | 92 ++++++++++++++++++++++
 .../rocksdb/RocksDBStateBackendMigrationTest.java  | 92 ++++++++++++++++++++++
 .../state/rocksdb/ttl/RocksDBTtlStateTestBase.java | 21 +++++
 18 files changed, 391 insertions(+), 75 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
index 4ed106e88cc..fe3aacea744 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -294,7 +296,7 @@ public abstract class StateSerializerProvider<T> {
 
         @Nonnull
         @Override
-        @SuppressWarnings("ConstantConditions")
+        @SuppressWarnings({"ConstantConditions", "unchecked", "rawtypes"})
         public TypeSerializerSchemaCompatibility<T> 
registerNewSerializerForRestoredState(
                 TypeSerializer<T> newSerializer) {
             checkNotNull(newSerializer);
@@ -303,10 +305,14 @@ public abstract class StateSerializerProvider<T> {
                         "A serializer has already been registered for the 
state; re-registration is not allowed.");
             }
 
+            // Use wrapped ttl serializer for compatibility check
             TypeSerializerSchemaCompatibility<T> result =
-                    newSerializer
+                    TtlAwareSerializer.wrapTtlAwareSerializer(newSerializer)
                             .snapshotConfiguration()
-                            
.resolveSchemaCompatibility(previousSerializerSnapshot);
+                            .resolveSchemaCompatibility(
+                                    new TtlAwareSerializerSnapshotWrapper(
+                                                    previousSerializerSnapshot)
+                                            .getTtlAwareSerializerSnapshot());
             if (result.isIncompatible()) {
                 invalidateCurrentSchemaSerializerAccess();
             }
@@ -349,6 +355,7 @@ public abstract class StateSerializerProvider<T> {
 
         @Nonnull
         @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
         public TypeSerializerSchemaCompatibility<T> 
setPreviousSerializerSnapshotForRestoredState(
                 TypeSerializerSnapshot<T> previousSerializerSnapshot) {
             checkNotNull(previousSerializerSnapshot);
@@ -359,10 +366,15 @@ public abstract class StateSerializerProvider<T> {
 
             this.previousSerializerSnapshot = previousSerializerSnapshot;
 
+            // Use wrapped ttl serializer for compatibility check
             TypeSerializerSchemaCompatibility<T> result =
-                    Preconditions.checkNotNull(registeredSerializer)
+                    TtlAwareSerializer.wrapTtlAwareSerializer(
+                                    
Preconditions.checkNotNull(registeredSerializer))
                             .snapshotConfiguration()
-                            
.resolveSchemaCompatibility(previousSerializerSnapshot);
+                            .resolveSchemaCompatibility(
+                                    new TtlAwareSerializerSnapshotWrapper(
+                                                    previousSerializerSnapshot)
+                                            .getTtlAwareSerializerSnapshot());
             if (result.isIncompatible()) {
                 invalidateCurrentSchemaSerializerAccess();
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index daabc9302c8..114822bac16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran
 import org.apache.flink.runtime.state.StateSnapshotTransformers;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.StateMigrationException;
@@ -253,6 +254,17 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                 + ").");
             }
 
+            // HeapKeyedStateBackend doesn't support ttl state migration 
currently.
+            if (TtlAwareSerializer.needTtlStateMigration(
+                    previousStateSerializer, newStateSerializer)) {
+                throw new StateMigrationException(
+                        "For heap backends, the new state serializer ("
+                                + newStateSerializer
+                                + ") must not need ttl state migration with 
the old state serializer ("
+                                + previousStateSerializer
+                                + ").");
+            }
+
             restoredKvMetaInfo =
                     allowFutureMetadataUpdates
                             ? 
restoredKvMetaInfo.withSerializerUpgradesAllowed()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index b6ac09dc2da..415d21dbf30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -154,7 +154,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
                         e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
     }
 
-    private void testKeyedValueStateUpgrade(
+    protected void testKeyedValueStateUpgrade(
             ValueStateDescriptor<TestType> initialAccessDescriptor,
             ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore)
             throws Exception {
@@ -271,7 +271,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
                         e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
     }
 
-    private void testKeyedListStateUpgrade(
+    protected void testKeyedListStateUpgrade(
             ListStateDescriptor<TestType> initialAccessDescriptor,
             ListStateDescriptor<TestType> newAccessDescriptorAfterRestore)
             throws Exception {
@@ -431,7 +431,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
         return set.iterator();
     }
 
-    private void testKeyedMapStateUpgrade(
+    protected void testKeyedMapStateUpgrade(
             MapStateDescriptor<Integer, TestType> initialAccessDescriptor,
             MapStateDescriptor<Integer, TestType> 
newAccessDescriptorAfterRestore)
             throws Exception {
@@ -1203,7 +1203,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
     }
 
     @TestTemplate
-    void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
+    protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() 
throws Exception {
         final String stateName = "test-ttl";
 
         ValueStateDescriptor<TestType> initialAccessDescriptor =
@@ -1219,17 +1219,16 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
                                 testKeyedValueStateUpgrade(
                                         initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
                 .satisfiesAnyOf(
-                        e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-                        e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+                        e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+                        e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
     }
 
     @TestTemplate
-    void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
+    protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() 
throws Exception {
         final String stateName = "test-ttl";
 
         ValueStateDescriptor<TestType> initialAccessDescriptor =
                 new ValueStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
-
         ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
                 new ValueStateDescriptor<>(stateName, new 
TestType.V2TestTypeSerializer());
         newAccessDescriptorAfterRestore.enableTimeToLive(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index 597685872e4..f681c551e85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -89,7 +89,7 @@ public abstract class StateBackendTestContext {
         }
     }
 
-    void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
+    public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
         createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
     }
 
@@ -144,7 +144,7 @@ public abstract class StateBackendTestContext {
         }
     }
 
-    KeyedStateHandle takeSnapshot() throws Exception {
+    public KeyedStateHandle takeSnapshot() throws Exception {
         SnapshotResult<KeyedStateHandle> snapshotResult = 
triggerSnapshot().get();
         KeyedStateHandle jobManagerOwnedSnapshot = 
snapshotResult.getJobManagerOwnedSnapshot();
         if (jobManagerOwnedSnapshot != null) {
@@ -171,7 +171,7 @@ public abstract class StateBackendTestContext {
     }
 
     @SuppressWarnings("unchecked")
-    <N, S extends State, V> S createState(
+    public <N, S extends State, V> S createState(
             StateDescriptor<S, V> stateDescriptor,
             @SuppressWarnings("SameParameterValue") N defaultNamespace)
             throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 86bd99c287e..b652932d092 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -109,7 +109,7 @@ public abstract class TtlStateTestBase {
         return (TtlMergingStateTestContext<?, UV, ?>) ctx;
     }
 
-    private void initTest() throws Exception {
+    protected void initTest() throws Exception {
         initTest(
                 StateTtlConfig.UpdateType.OnCreateAndWrite,
                 StateTtlConfig.StateVisibility.NeverReturnExpired);
@@ -496,7 +496,7 @@ public abstract class TtlStateTestBase {
     }
 
     @TestTemplate
-    void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
+    protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws 
Exception {
         assumeThat(this).isNotInstanceOf(MockTtlStateTest.class);
 
         initTest();
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java
index fb2b69698c0..858570bb67d 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java
@@ -64,4 +64,18 @@ public class ChangelogStateBackendMigrationTest
         // TODO support checking key serializer
         return false;
     }
+
+    @Override
+    protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() 
throws Exception {
+        if (!(this.delegatedStateBackendSupplier.get() instanceof 
EmbeddedRocksDBStateBackend)) {
+            super.testStateMigrationAfterChangingTTLFromDisablingToEnabling();
+        }
+    }
+
+    @Override
+    protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() 
throws Exception {
+        if (!(this.delegatedStateBackendSupplier.get() instanceof 
EmbeddedRocksDBStateBackend)) {
+            super.testStateMigrationAfterChangingTTLFromEnablingToDisabling();
+        }
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
index d5f15d902db..faee84a9cdb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSeriali
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -36,6 +38,8 @@ import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base class for {@link State} implementations that store state in a RocksDB 
database.
  *
@@ -184,12 +188,21 @@ public abstract class AbstractRocksDBState<K, N, V> 
implements InternalKvState<K
             DataInputDeserializer serializedOldValueInput,
             DataOutputSerializer serializedMigratedValueOutput,
             TypeSerializer<V> priorSerializer,
-            TypeSerializer<V> newSerializer)
+            TypeSerializer<V> newSerializer,
+            TtlTimeProvider ttlTimeProvider)
             throws StateMigrationException {
+        checkArgument(priorSerializer instanceof TtlAwareSerializer);
+        checkArgument(newSerializer instanceof TtlAwareSerializer);
+        TtlAwareSerializer<V, ?> ttlAwarePriorSerializer =
+                (TtlAwareSerializer<V, ?>) priorSerializer;
+        TtlAwareSerializer<V, ?> ttlAwareNewSerializer = 
(TtlAwareSerializer<V, ?>) newSerializer;
 
         try {
-            V value = priorSerializer.deserialize(serializedOldValueInput);
-            newSerializer.serialize(value, serializedMigratedValueOutput);
+            ttlAwareNewSerializer.migrateValueFromPriorSerializer(
+                    ttlAwarePriorSerializer,
+                    () -> 
ttlAwarePriorSerializer.deserialize(serializedOldValueInput),
+                    serializedMigratedValueOutput,
+                    ttlTimeProvider);
         } catch (Exception e) {
             throw new StateMigrationException("Error while trying to migrate 
RocksDB state.", e);
         }
@@ -233,6 +246,11 @@ public abstract class AbstractRocksDBState<K, N, V> 
implements InternalKvState<K
         return this;
     }
 
+    protected AbstractRocksDBState<K, N, V> setColumnFamily(ColumnFamilyHandle 
columnFamily) {
+        this.columnFamily = columnFamily;
+        return this;
+    }
+
     @Override
     public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(
             int recommendedMaxNumberOfReturnedRecords) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java
index 0e91800fceb..52fe74cf331 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java
@@ -192,6 +192,7 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
                                 ((AggregatingStateDescriptor) 
stateDesc).getAggregateFunction())
                         
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
                         
.setValueSerializer(registerResult.f1.getStateSerializer())
-                        .setDefaultValue(stateDesc.getDefaultValue());
+                        .setDefaultValue(stateDesc.getDefaultValue())
+                        .setColumnFamily(registerResult.f0);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index edae8e5f391..4f7464bdb2f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import 
org.apache.flink.state.rocksdb.iterator.RocksStateKeysAndNamespaceIterator;
 import org.apache.flink.state.rocksdb.iterator.RocksStateKeysIterator;
@@ -88,6 +89,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -687,20 +689,20 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             RegisteredKeyValueStateBackendMetaInfo<N, SV> castedMetaInfo =
                     (RegisteredKeyValueStateBackendMetaInfo<N, SV>) 
oldStateInfo.metaInfo;
 
-            newMetaInfo =
-                    updateRestoredStateMetaInfo(
-                            Tuple2.of(oldStateInfo.columnFamilyHandle, 
castedMetaInfo),
-                            stateDesc,
-                            namespaceSerializer,
-                            stateSerializer);
-
+            Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
+                    newRocksDBState =
+                            updateRestoredStateMetaInfo(
+                                    Tuple2.of(oldStateInfo.columnFamilyHandle, 
castedMetaInfo),
+                                    stateDesc,
+                                    namespaceSerializer,
+                                    stateSerializer);
+            newMetaInfo = newRocksDBState.f1;
             newMetaInfo =
                     allowFutureMetadataUpdates
                             ? newMetaInfo.withSerializerUpgradesAllowed()
                             : newMetaInfo;
 
-            newRocksStateInfo =
-                    new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, 
newMetaInfo);
+            newRocksStateInfo = new RocksDbKvStateInfo(newRocksDBState.f0, 
newMetaInfo);
             kvStateInformation.put(stateDesc.getName(), newRocksStateInfo);
             sstMergeManager.register(newRocksStateInfo);
         } else {
@@ -744,13 +746,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
     }
 
     private <N, S extends State, SV>
-            RegisteredKeyValueStateBackendMetaInfo<N, SV> 
updateRestoredStateMetaInfo(
-                    Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
-                            oldStateInfo,
-                    StateDescriptor<S, SV> stateDesc,
-                    TypeSerializer<N> namespaceSerializer,
-                    TypeSerializer<SV> stateSerializer)
-                    throws Exception {
+            Tuple2<ColumnFamilyHandle, 
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
+                    updateRestoredStateMetaInfo(
+                            Tuple2<
+                                            ColumnFamilyHandle,
+                                            
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
+                                    oldStateInfo,
+                            StateDescriptor<S, SV> stateDesc,
+                            TypeSerializer<N> namespaceSerializer,
+                            TypeSerializer<SV> stateSerializer)
+                            throws Exception {
 
         RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo 
= oldStateInfo.f1;
 
@@ -789,7 +794,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                             + ").");
         }
 
-        return restoredKvStateMetaInfo;
+        return oldStateInfo;
     }
 
     /**
@@ -855,14 +860,43 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
             DataInputDeserializer serializedValueInput = new 
DataInputDeserializer();
             DataOutputSerializer migratedSerializedValueOutput = new 
DataOutputSerializer(512);
+
+            // Check if this is ttl state migration
+            TtlAwareSerializer<SV, ?> previousTtlAwareSerializer =
+                    (TtlAwareSerializer<SV, ?>)
+                            TtlAwareSerializer.wrapTtlAwareSerializer(
+                                    
stateMetaInfo.f1.getPreviousStateSerializer());
+            TtlAwareSerializer<SV, ?> currentTtlAwareSerializer =
+                    (TtlAwareSerializer<SV, ?>)
+                            TtlAwareSerializer.wrapTtlAwareSerializer(
+                                    stateMetaInfo.f1.getStateSerializer());
+
+            if (TtlAwareSerializer.needTtlStateMigration(
+                    previousTtlAwareSerializer, currentTtlAwareSerializer)) {
+                // By performing ttl state migration, we need to recreate 
column family to
+                // enable/disable ttl compaction filter factory.
+                db.dropColumnFamily(stateMetaInfo.f0);
+                stateMetaInfo.f0 =
+                        RocksDBOperationUtils.createColumnFamily(
+                                
RocksDBOperationUtils.createColumnFamilyDescriptor(
+                                        stateMetaInfo.f1,
+                                        columnFamilyOptionsFactory,
+                                        ttlCompactFiltersManager,
+                                        
optionsContainer.getWriteBufferManagerCapacity()),
+                                db,
+                                Collections.emptyList(),
+                                ICloseableRegistry.NO_OP);
+            }
+
             while (iterator.isValid()) {
                 serializedValueInput.setBuffer(iterator.value());
 
                 rocksDBState.migrateSerializedValue(
                         serializedValueInput,
                         migratedSerializedValueOutput,
-                        stateMetaInfo.f1.getPreviousStateSerializer(),
-                        stateMetaInfo.f1.getStateSerializer());
+                        previousTtlAwareSerializer,
+                        currentTtlAwareSerializer,
+                        this.ttlTimeProvider);
 
                 batchWriter.put(
                         stateMetaInfo.f0,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java
index be426a95db5..8c6088df23d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.state.ListDelimitedSerializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -198,24 +200,30 @@ class RocksDBListState<K, N, V> extends 
AbstractRocksDBState<K, N, List<V>>
             DataInputDeserializer serializedOldValueInput,
             DataOutputSerializer serializedMigratedValueOutput,
             TypeSerializer<List<V>> priorSerializer,
-            TypeSerializer<List<V>> newSerializer)
+            TypeSerializer<List<V>> newSerializer,
+            TtlTimeProvider ttlTimeProvider)
             throws StateMigrationException {
-
-        Preconditions.checkArgument(priorSerializer instanceof ListSerializer);
-        Preconditions.checkArgument(newSerializer instanceof ListSerializer);
-
-        TypeSerializer<V> priorElementSerializer =
-                ((ListSerializer<V>) priorSerializer).getElementSerializer();
-
-        TypeSerializer<V> newElementSerializer =
-                ((ListSerializer<V>) newSerializer).getElementSerializer();
+        Preconditions.checkArgument(
+                priorSerializer instanceof 
TtlAwareSerializer.TtlAwareListSerializer);
+        Preconditions.checkArgument(
+                newSerializer instanceof 
TtlAwareSerializer.TtlAwareListSerializer);
+
+        TtlAwareSerializer<V, ?> priorTtlAwareElementSerializer =
+                ((TtlAwareSerializer.TtlAwareListSerializer<V>) 
priorSerializer)
+                        .getElementSerializer();
+        TtlAwareSerializer<V, ?> newTtlAwareElementSerializer =
+                ((TtlAwareSerializer.TtlAwareListSerializer<V>) newSerializer)
+                        .getElementSerializer();
 
         try {
             while (serializedOldValueInput.available() > 0) {
-                V element =
-                        ListDelimitedSerializer.deserializeNextElement(
-                                serializedOldValueInput, 
priorElementSerializer);
-                newElementSerializer.serialize(element, 
serializedMigratedValueOutput);
+                newTtlAwareElementSerializer.migrateValueFromPriorSerializer(
+                        priorTtlAwareElementSerializer,
+                        () ->
+                                ListDelimitedSerializer.deserializeNextElement(
+                                        serializedOldValueInput, 
priorTtlAwareElementSerializer),
+                        serializedMigratedValueOutput,
+                        ttlTimeProvider);
                 if (serializedOldValueInput.available() > 0) {
                     serializedMigratedValueOutput.write(DELIMITER);
                 }
@@ -260,7 +268,8 @@ class RocksDBListState<K, N, V> extends 
AbstractRocksDBState<K, N, List<V>>
                         
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
                         .setValueSerializer(
                                 (TypeSerializer<List<E>>) 
registerResult.f1.getStateSerializer())
-                        .setDefaultValue((List<E>) 
stateDesc.getDefaultValue());
+                        .setDefaultValue((List<E>) stateDesc.getDefaultValue())
+                        .setColumnFamily(registerResult.f0);
     }
 
     static class StateSnapshotTransformerWrapper<T> implements 
StateSnapshotTransformer<byte[]> {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java
index 20470c3a3dd..c811fc8caac 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -225,25 +227,33 @@ class RocksDBMapState<K, N, UK, UV> extends 
AbstractRocksDBState<K, N, Map<UK, U
             DataInputDeserializer serializedOldValueInput,
             DataOutputSerializer serializedMigratedValueOutput,
             TypeSerializer<Map<UK, UV>> priorSerializer,
-            TypeSerializer<Map<UK, UV>> newSerializer)
+            TypeSerializer<Map<UK, UV>> newSerializer,
+            TtlTimeProvider ttlTimeProvider)
             throws StateMigrationException {
 
-        checkArgument(priorSerializer instanceof MapSerializer);
-        checkArgument(newSerializer instanceof MapSerializer);
+        checkArgument(priorSerializer instanceof 
TtlAwareSerializer.TtlAwareMapSerializer);
+        checkArgument(newSerializer instanceof 
TtlAwareSerializer.TtlAwareMapSerializer);
 
-        TypeSerializer<UV> priorMapValueSerializer =
-                ((MapSerializer<UK, UV>) priorSerializer).getValueSerializer();
-        TypeSerializer<UV> newMapValueSerializer =
-                ((MapSerializer<UK, UV>) newSerializer).getValueSerializer();
+        TtlAwareSerializer<UV, ?> priorTtlAwareMapValueSerializer =
+                ((TtlAwareSerializer.TtlAwareMapSerializer<UK, UV>) 
priorSerializer)
+                        .getValueSerializer();
+        TtlAwareSerializer<UV, ?> newTtlAwareMapValueSerializer =
+                ((TtlAwareSerializer.TtlAwareMapSerializer<UK, UV>) 
newSerializer)
+                        .getValueSerializer();
 
         try {
             boolean isNull = serializedOldValueInput.readBoolean();
-            UV mapUserValue = null;
-            if (!isNull) {
-                mapUserValue = 
priorMapValueSerializer.deserialize(serializedOldValueInput);
+            serializedMigratedValueOutput.writeBoolean(isNull);
+
+            if (isNull) {
+                newTtlAwareMapValueSerializer.serialize(null, 
serializedMigratedValueOutput);
+            } else {
+                newTtlAwareMapValueSerializer.migrateValueFromPriorSerializer(
+                        priorTtlAwareMapValueSerializer,
+                        () -> 
priorTtlAwareMapValueSerializer.deserialize(serializedOldValueInput),
+                        serializedMigratedValueOutput,
+                        ttlTimeProvider);
             }
-            serializedMigratedValueOutput.writeBoolean(mapUserValue == null);
-            newMapValueSerializer.serialize(mapUserValue, 
serializedMigratedValueOutput);
         } catch (Exception e) {
             throw new StateMigrationException(
                     "Error while trying to migrate RocksDB map state.", e);
@@ -726,7 +736,8 @@ class RocksDBMapState<K, N, UK, UV> extends 
AbstractRocksDBState<K, N, Map<UK, U
                         .setValueSerializer(
                                 (TypeSerializer<Map<UK, UV>>)
                                         registerResult.f1.getStateSerializer())
-                        .setDefaultValue((Map<UK, UV>) 
stateDesc.getDefaultValue());
+                        .setDefaultValue((Map<UK, UV>) 
stateDesc.getDefaultValue())
+                        .setColumnFamily(registerResult.f0);
     }
 
     /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java
index 4cd9dff358d..0f47d9dafac 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java
@@ -275,7 +275,7 @@ public class RocksDBOperationUtils {
                 .setMergeOperatorName(MERGE_OPERATOR_NAME);
     }
 
-    private static ColumnFamilyHandle createColumnFamily(
+    public static ColumnFamilyHandle createColumnFamily(
             ColumnFamilyDescriptor columnDescriptor,
             RocksDB db,
             List<ExportImportFilesMetaData> importFilesMetaData,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java
index 3bcf21df9c8..464d55af26f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java
@@ -182,6 +182,7 @@ class RocksDBReducingState<K, N, V> extends 
AbstractRocksDBAppendingState<K, N,
                         .setReduceFunction(
                                 ((ReducingStateDescriptor<SV>) 
stateDesc).getReduceFunction())
                         
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
-                        .setDefaultValue(stateDesc.getDefaultValue());
+                        .setDefaultValue(stateDesc.getDefaultValue())
+                        .setColumnFamily(registerResult.f0);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java
index dfe27bf2eed..2484b831135 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java
@@ -134,6 +134,7 @@ class RocksDBValueState<K, N, V> extends 
AbstractRocksDBState<K, N, V>
                 ((RocksDBValueState<K, N, SV>) existingState)
                         
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
                         
.setValueSerializer(registerResult.f1.getStateSerializer())
-                        .setDefaultValue(stateDesc.getDefaultValue());
+                        .setDefaultValue(stateDesc.getDefaultValue())
+                        .setColumnFamily(registerResult.f0);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java
index c393a06c00f..69c7f5706c6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlUtils;
 import org.apache.flink.runtime.state.ttl.TtlValue;
@@ -92,8 +92,7 @@ public class RocksDbTtlCompactFiltersManager {
         if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
             RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase =
                     (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
-            if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(
-                    kvMetaInfoBase.getStateSerializer())) {
+            if 
(TtlAwareSerializer.isSerializerTtlEnabled(kvMetaInfoBase.getStateSerializer()))
 {
                 createAndSetCompactFilterFactory(metaInfoBase.getName(), 
options);
             }
         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java
index 835d9ed0d59..8f94c7868b9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java
@@ -18,21 +18,29 @@
 
 package org.apache.flink.state.rocksdb;
 
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackendMigrationTestBase;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
@@ -79,4 +87,88 @@ public class EmbeddedRocksDBStateBackendMigrationTest
     protected CheckpointStorage getCheckpointStorage() throws Exception {
         return storageSupplier.get();
     }
+
+    /// Todo: Move to StateBackendMigrationTestBase when all backends support 
ttl state migration
+    @TestTemplate
+    protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() 
throws Exception {
+        final String stateName = "test-ttl";
+
+        ValueStateDescriptor<TestType> initialAccessDescriptor =
+                new ValueStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
+                new ValueStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        ListStateDescriptor<TestType> initialAccessListDescriptor =
+                new ListStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
+                new ListStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessListDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
+                new MapStateDescriptor<>(
+                        stateName, IntSerializer.INSTANCE, new 
TestType.V1TestTypeSerializer());
+        MapStateDescriptor<Integer, TestType> 
newAccessMapDescriptorAfterRestore =
+                new MapStateDescriptor<>(
+                        stateName,
+                        IntSerializer.INSTANCE,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessMapDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        testKeyedValueStateUpgrade(initialAccessDescriptor, 
newAccessDescriptorAfterRestore);
+        testKeyedListStateUpgrade(initialAccessListDescriptor, 
newAccessListDescriptorAfterRestore);
+        testKeyedMapStateUpgrade(initialAccessMapDescriptor, 
newAccessMapDescriptorAfterRestore);
+    }
+
+    /// Todo: Move to StateBackendMigrationTestBase when all backends support 
ttl state migration
+    @TestTemplate
+    protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() 
throws Exception {
+        final String stateName = "test-ttl";
+
+        ValueStateDescriptor<TestType> initialAccessDescriptor =
+                new ValueStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        initialAccessDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
+                new ValueStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        ListStateDescriptor<TestType> initialAccessListDescriptor =
+                new ListStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        initialAccessListDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
+                new ListStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
+                new MapStateDescriptor<>(
+                        stateName, IntSerializer.INSTANCE, new 
TestType.V1TestTypeSerializer());
+        initialAccessMapDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        MapStateDescriptor<Integer, TestType> 
newAccessMapDescriptorAfterRestore =
+                new MapStateDescriptor<>(
+                        stateName,
+                        IntSerializer.INSTANCE,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        testKeyedValueStateUpgrade(initialAccessDescriptor, 
newAccessDescriptorAfterRestore);
+        testKeyedListStateUpgrade(initialAccessListDescriptor, 
newAccessListDescriptorAfterRestore);
+        testKeyedMapStateUpgrade(initialAccessMapDescriptor, 
newAccessMapDescriptorAfterRestore);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java
index 6baa2c7e4a7..964b45f4e0d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java
@@ -18,19 +18,27 @@
 
 package org.apache.flink.state.rocksdb;
 
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackendMigrationTestBase;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.TernaryBoolean;
 
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -70,4 +78,88 @@ public class RocksDBStateBackendMigrationTest
         String checkpointPath = 
TempDirUtils.newFolder(tempFolder).toURI().toString();
         return new FileSystemCheckpointStorage(checkpointPath);
     }
+
+    /// Todo: Move to StateBackendMigrationTestBase when all backends support 
ttl state migration
+    @TestTemplate
+    protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() 
throws Exception {
+        final String stateName = "test-ttl";
+
+        ValueStateDescriptor<TestType> initialAccessDescriptor =
+                new ValueStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
+                new ValueStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        ListStateDescriptor<TestType> initialAccessListDescriptor =
+                new ListStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
+                new ListStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessListDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
+                new MapStateDescriptor<>(
+                        stateName, IntSerializer.INSTANCE, new 
TestType.V1TestTypeSerializer());
+        MapStateDescriptor<Integer, TestType> 
newAccessMapDescriptorAfterRestore =
+                new MapStateDescriptor<>(
+                        stateName,
+                        IntSerializer.INSTANCE,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+        newAccessMapDescriptorAfterRestore.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+
+        testKeyedValueStateUpgrade(initialAccessDescriptor, 
newAccessDescriptorAfterRestore);
+        testKeyedListStateUpgrade(initialAccessListDescriptor, 
newAccessListDescriptorAfterRestore);
+        testKeyedMapStateUpgrade(initialAccessMapDescriptor, 
newAccessMapDescriptorAfterRestore);
+    }
+
+    /// Todo: Move to StateBackendMigrationTestBase when all backends support 
ttl state migration
+    @TestTemplate
+    protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() 
throws Exception {
+        final String stateName = "test-ttl";
+
+        ValueStateDescriptor<TestType> initialAccessDescriptor =
+                new ValueStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        initialAccessDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
+                new ValueStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        ListStateDescriptor<TestType> initialAccessListDescriptor =
+                new ListStateDescriptor<>(stateName, new 
TestType.V1TestTypeSerializer());
+        initialAccessListDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore =
+                new ListStateDescriptor<>(
+                        stateName,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor =
+                new MapStateDescriptor<>(
+                        stateName, IntSerializer.INSTANCE, new 
TestType.V1TestTypeSerializer());
+        initialAccessMapDescriptor.enableTimeToLive(
+                StateTtlConfig.newBuilder(Duration.ofDays(1)).build());
+        MapStateDescriptor<Integer, TestType> 
newAccessMapDescriptorAfterRestore =
+                new MapStateDescriptor<>(
+                        stateName,
+                        IntSerializer.INSTANCE,
+                        // restore with a V2 serializer that has a different 
schema
+                        new TestType.V2TestTypeSerializer());
+
+        testKeyedValueStateUpgrade(initialAccessDescriptor, 
newAccessDescriptorAfterRestore);
+        testKeyedListStateUpgrade(initialAccessListDescriptor, 
newAccessListDescriptorAfterRestore);
+        testKeyedMapStateUpgrade(initialAccessMapDescriptor, 
newAccessMapDescriptorAfterRestore);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java
index de5d0316c2a..17d05201eaa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.MockTtlStateTest;
 import org.apache.flink.runtime.state.ttl.StateBackendTestContext;
 import org.apache.flink.runtime.state.ttl.TtlStateTestBase;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -41,6 +43,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Base test suite for rocksdb state TTL. */
 public abstract class RocksDBTtlStateTestBase extends TtlStateTestBase {
@@ -89,6 +92,24 @@ public abstract class RocksDBTtlStateTestBase extends 
TtlStateTestBase {
         testCompactFilter(false, false);
     }
 
+    /// Todo: Move to TtlStateTestBase when all backends support ttl state 
migration
+    @TestTemplate
+    @Override
+    protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws 
Exception {
+        assumeThat(this).isNotInstanceOf(MockTtlStateTest.class);
+
+        initTest();
+
+        timeProvider.time = 0;
+        ctx().update(ctx().updateEmpty);
+
+        KeyedStateHandle snapshot = sbetc.takeSnapshot();
+        sbetc.createAndRestoreKeyedStateBackend(snapshot);
+
+        sbetc.setCurrentKey("defaultKey");
+        sbetc.createState(ctx().createStateDescriptor(), "");
+    }
+
     @TestTemplate
     public void testCompactFilterWithSnapshot() throws Exception {
         testCompactFilter(true, false);

Reply via email to