This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2568dee63138899cb80982a9659ab25f0d38c2c Author: Xiangyu Feng <[email protected]> AuthorDate: Thu Feb 6 14:00:54 2025 +0800 [FLINK-35780][state] Support state migration between disabling and enabling state ttl in RocksDBKeyedStateBackend (#25035) --- .../runtime/state/StateSerializerProvider.java | 22 ++++-- .../runtime/state/heap/HeapKeyedStateBackend.java | 12 +++ .../state/StateBackendMigrationTestBase.java | 15 ++-- .../runtime/state/ttl/StateBackendTestContext.java | 6 +- .../flink/runtime/state/ttl/TtlStateTestBase.java | 4 +- .../ChangelogStateBackendMigrationTest.java | 14 ++++ .../flink/state/rocksdb/AbstractRocksDBState.java | 24 +++++- .../state/rocksdb/RocksDBAggregatingState.java | 3 +- .../state/rocksdb/RocksDBKeyedStateBackend.java | 72 ++++++++++++----- .../flink/state/rocksdb/RocksDBListState.java | 39 +++++---- .../flink/state/rocksdb/RocksDBMapState.java | 37 ++++++--- .../flink/state/rocksdb/RocksDBOperationUtils.java | 2 +- .../flink/state/rocksdb/RocksDBReducingState.java | 3 +- .../flink/state/rocksdb/RocksDBValueState.java | 3 +- .../ttl/RocksDbTtlCompactFiltersManager.java | 5 +- .../EmbeddedRocksDBStateBackendMigrationTest.java | 92 ++++++++++++++++++++++ .../rocksdb/RocksDBStateBackendMigrationTest.java | 92 ++++++++++++++++++++++ .../state/rocksdb/ttl/RocksDBTtlStateTestBase.java | 21 +++++ 18 files changed, 391 insertions(+), 75 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java index 4ed106e88cc..fe3aacea744 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -294,7 +296,7 @@ public abstract class StateSerializerProvider<T> { @Nonnull @Override - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "unchecked", "rawtypes"}) public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState( TypeSerializer<T> newSerializer) { checkNotNull(newSerializer); @@ -303,10 +305,14 @@ public abstract class StateSerializerProvider<T> { "A serializer has already been registered for the state; re-registration is not allowed."); } + // Use wrapped ttl serializer for compatibility check TypeSerializerSchemaCompatibility<T> result = - newSerializer + TtlAwareSerializer.wrapTtlAwareSerializer(newSerializer) .snapshotConfiguration() - .resolveSchemaCompatibility(previousSerializerSnapshot); + .resolveSchemaCompatibility( + new TtlAwareSerializerSnapshotWrapper( + previousSerializerSnapshot) + .getTtlAwareSerializerSnapshot()); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } @@ -349,6 +355,7 @@ public abstract class StateSerializerProvider<T> { @Nonnull @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState( TypeSerializerSnapshot<T> previousSerializerSnapshot) { checkNotNull(previousSerializerSnapshot); @@ -359,10 +366,15 @@ public abstract class StateSerializerProvider<T> { this.previousSerializerSnapshot = previousSerializerSnapshot; + // Use wrapped ttl serializer for compatibility check TypeSerializerSchemaCompatibility<T> result = - Preconditions.checkNotNull(registeredSerializer) + TtlAwareSerializer.wrapTtlAwareSerializer( + Preconditions.checkNotNull(registeredSerializer)) .snapshotConfiguration() - .resolveSchemaCompatibility(previousSerializerSnapshot); + .resolveSchemaCompatibility( + new TtlAwareSerializerSnapshotWrapper( + previousSerializerSnapshot) + .getTtlAwareSerializerSnapshot()); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index daabc9302c8..114822bac16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTran import org.apache.flink.runtime.state.StateSnapshotTransformers; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.StateMigrationException; @@ -253,6 +254,17 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { + ")."); } + // HeapKeyedStateBackend doesn't support ttl state migration currently. + if (TtlAwareSerializer.needTtlStateMigration( + previousStateSerializer, newStateSerializer)) { + throw new StateMigrationException( + "For heap backends, the new state serializer (" + + newStateSerializer + + ") must not need ttl state migration with the old state serializer (" + + previousStateSerializer + + ")."); + } + restoredKvMetaInfo = allowFutureMetadataUpdates ? restoredKvMetaInfo.withSerializerUpgradesAllowed() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index b6ac09dc2da..415d21dbf30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -154,7 +154,7 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } - private void testKeyedValueStateUpgrade( + protected void testKeyedValueStateUpgrade( ValueStateDescriptor<TestType> initialAccessDescriptor, ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception { @@ -271,7 +271,7 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } - private void testKeyedListStateUpgrade( + protected void testKeyedListStateUpgrade( ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception { @@ -431,7 +431,7 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { return set.iterator(); } - private void testKeyedMapStateUpgrade( + protected void testKeyedMapStateUpgrade( MapStateDescriptor<Integer, TestType> initialAccessDescriptor, MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception { @@ -1203,7 +1203,7 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { } @TestTemplate - void testStateMigrationAfterChangingTTLFromEnablingToDisabling() { + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { final String stateName = "test-ttl"; ValueStateDescriptor<TestType> initialAccessDescriptor = @@ -1219,17 +1219,16 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(IllegalStateException.class), - e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); + e -> assertThat(e).isInstanceOf(StateMigrationException.class), + e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } @TestTemplate - void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { final String stateName = "test-ttl"; ValueStateDescriptor<TestType> initialAccessDescriptor = new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); - ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore = new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer()); newAccessDescriptorAfterRestore.enableTimeToLive( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java index 597685872e4..f681c551e85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -89,7 +89,7 @@ public abstract class StateBackendTestContext { } } - void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) { + public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) { createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot); } @@ -144,7 +144,7 @@ public abstract class StateBackendTestContext { } } - KeyedStateHandle takeSnapshot() throws Exception { + public KeyedStateHandle takeSnapshot() throws Exception { SnapshotResult<KeyedStateHandle> snapshotResult = triggerSnapshot().get(); KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot(); if (jobManagerOwnedSnapshot != null) { @@ -171,7 +171,7 @@ public abstract class StateBackendTestContext { } @SuppressWarnings("unchecked") - <N, S extends State, V> S createState( + public <N, S extends State, V> S createState( StateDescriptor<S, V> stateDescriptor, @SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index 86bd99c287e..b652932d092 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -109,7 +109,7 @@ public abstract class TtlStateTestBase { return (TtlMergingStateTestContext<?, UV, ?>) ctx; } - private void initTest() throws Exception { + protected void initTest() throws Exception { initTest( StateTtlConfig.UpdateType.OnCreateAndWrite, StateTtlConfig.StateVisibility.NeverReturnExpired); @@ -496,7 +496,7 @@ public abstract class TtlStateTestBase { } @TestTemplate - void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { + protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { assumeThat(this).isNotInstanceOf(MockTtlStateTest.class); initTest(); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java index fb2b69698c0..858570bb67d 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java @@ -64,4 +64,18 @@ public class ChangelogStateBackendMigrationTest // TODO support checking key serializer return false; } + + @Override + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) { + super.testStateMigrationAfterChangingTTLFromDisablingToEnabling(); + } + } + + @Override + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) { + super.testStateMigrationAfterChangingTTLFromEnablingToDisabling(); + } + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java index d5f15d902db..faee84a9cdb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java @@ -26,6 +26,8 @@ import org.apache.flink.queryablestate.client.state.serialization.KvStateSeriali import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -36,6 +38,8 @@ import org.rocksdb.WriteOptions; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Base class for {@link State} implementations that store state in a RocksDB database. * @@ -184,12 +188,21 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<V> priorSerializer, - TypeSerializer<V> newSerializer) + TypeSerializer<V> newSerializer, + TtlTimeProvider ttlTimeProvider) throws StateMigrationException { + checkArgument(priorSerializer instanceof TtlAwareSerializer); + checkArgument(newSerializer instanceof TtlAwareSerializer); + TtlAwareSerializer<V, ?> ttlAwarePriorSerializer = + (TtlAwareSerializer<V, ?>) priorSerializer; + TtlAwareSerializer<V, ?> ttlAwareNewSerializer = (TtlAwareSerializer<V, ?>) newSerializer; try { - V value = priorSerializer.deserialize(serializedOldValueInput); - newSerializer.serialize(value, serializedMigratedValueOutput); + ttlAwareNewSerializer.migrateValueFromPriorSerializer( + ttlAwarePriorSerializer, + () -> ttlAwarePriorSerializer.deserialize(serializedOldValueInput), + serializedMigratedValueOutput, + ttlTimeProvider); } catch (Exception e) { throw new StateMigrationException("Error while trying to migrate RocksDB state.", e); } @@ -233,6 +246,11 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K return this; } + protected AbstractRocksDBState<K, N, V> setColumnFamily(ColumnFamilyHandle columnFamily) { + this.columnFamily = columnFamily; + return this; + } + @Override public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor( int recommendedMaxNumberOfReturnedRecords) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java index 0e91800fceb..52fe74cf331 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java @@ -192,6 +192,7 @@ class RocksDBAggregatingState<K, N, T, ACC, R> ((AggregatingStateDescriptor) stateDesc).getAggregateFunction()) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer(registerResult.f1.getStateSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java index edae8e5f391..4f7464bdb2f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java @@ -56,6 +56,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.state.rocksdb.iterator.RocksStateKeysAndNamespaceIterator; import org.apache.flink.state.rocksdb.iterator.RocksStateKeysIterator; @@ -88,6 +89,7 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -687,20 +689,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RegisteredKeyValueStateBackendMetaInfo<N, SV> castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, SV>) oldStateInfo.metaInfo; - newMetaInfo = - updateRestoredStateMetaInfo( - Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo), - stateDesc, - namespaceSerializer, - stateSerializer); - + Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> + newRocksDBState = + updateRestoredStateMetaInfo( + Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo), + stateDesc, + namespaceSerializer, + stateSerializer); + newMetaInfo = newRocksDBState.f1; newMetaInfo = allowFutureMetadataUpdates ? newMetaInfo.withSerializerUpgradesAllowed() : newMetaInfo; - newRocksStateInfo = - new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo); + newRocksStateInfo = new RocksDbKvStateInfo(newRocksDBState.f0, newMetaInfo); kvStateInformation.put(stateDesc.getName(), newRocksStateInfo); sstMergeManager.register(newRocksStateInfo); } else { @@ -744,13 +746,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private <N, S extends State, SV> - RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo( - Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> - oldStateInfo, - StateDescriptor<S, SV> stateDesc, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<SV> stateSerializer) - throws Exception { + Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> + updateRestoredStateMetaInfo( + Tuple2< + ColumnFamilyHandle, + RegisteredKeyValueStateBackendMetaInfo<N, SV>> + oldStateInfo, + StateDescriptor<S, SV> stateDesc, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<SV> stateSerializer) + throws Exception { RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1; @@ -789,7 +794,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { + ")."); } - return restoredKvStateMetaInfo; + return oldStateInfo; } /** @@ -855,14 +860,43 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { DataInputDeserializer serializedValueInput = new DataInputDeserializer(); DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512); + + // Check if this is ttl state migration + TtlAwareSerializer<SV, ?> previousTtlAwareSerializer = + (TtlAwareSerializer<SV, ?>) + TtlAwareSerializer.wrapTtlAwareSerializer( + stateMetaInfo.f1.getPreviousStateSerializer()); + TtlAwareSerializer<SV, ?> currentTtlAwareSerializer = + (TtlAwareSerializer<SV, ?>) + TtlAwareSerializer.wrapTtlAwareSerializer( + stateMetaInfo.f1.getStateSerializer()); + + if (TtlAwareSerializer.needTtlStateMigration( + previousTtlAwareSerializer, currentTtlAwareSerializer)) { + // By performing ttl state migration, we need to recreate column family to + // enable/disable ttl compaction filter factory. + db.dropColumnFamily(stateMetaInfo.f0); + stateMetaInfo.f0 = + RocksDBOperationUtils.createColumnFamily( + RocksDBOperationUtils.createColumnFamilyDescriptor( + stateMetaInfo.f1, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + optionsContainer.getWriteBufferManagerCapacity()), + db, + Collections.emptyList(), + ICloseableRegistry.NO_OP); + } + while (iterator.isValid()) { serializedValueInput.setBuffer(iterator.value()); rocksDBState.migrateSerializedValue( serializedValueInput, migratedSerializedValueOutput, - stateMetaInfo.f1.getPreviousStateSerializer(), - stateMetaInfo.f1.getStateSerializer()); + previousTtlAwareSerializer, + currentTtlAwareSerializer, + this.ttlTimeProvider); batchWriter.put( stateMetaInfo.f0, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java index be426a95db5..8c6088df23d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBListState.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.state.ListDelimitedSerializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -198,24 +200,30 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<List<V>> priorSerializer, - TypeSerializer<List<V>> newSerializer) + TypeSerializer<List<V>> newSerializer, + TtlTimeProvider ttlTimeProvider) throws StateMigrationException { - - Preconditions.checkArgument(priorSerializer instanceof ListSerializer); - Preconditions.checkArgument(newSerializer instanceof ListSerializer); - - TypeSerializer<V> priorElementSerializer = - ((ListSerializer<V>) priorSerializer).getElementSerializer(); - - TypeSerializer<V> newElementSerializer = - ((ListSerializer<V>) newSerializer).getElementSerializer(); + Preconditions.checkArgument( + priorSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer); + Preconditions.checkArgument( + newSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer); + + TtlAwareSerializer<V, ?> priorTtlAwareElementSerializer = + ((TtlAwareSerializer.TtlAwareListSerializer<V>) priorSerializer) + .getElementSerializer(); + TtlAwareSerializer<V, ?> newTtlAwareElementSerializer = + ((TtlAwareSerializer.TtlAwareListSerializer<V>) newSerializer) + .getElementSerializer(); try { while (serializedOldValueInput.available() > 0) { - V element = - ListDelimitedSerializer.deserializeNextElement( - serializedOldValueInput, priorElementSerializer); - newElementSerializer.serialize(element, serializedMigratedValueOutput); + newTtlAwareElementSerializer.migrateValueFromPriorSerializer( + priorTtlAwareElementSerializer, + () -> + ListDelimitedSerializer.deserializeNextElement( + serializedOldValueInput, priorTtlAwareElementSerializer), + serializedMigratedValueOutput, + ttlTimeProvider); if (serializedOldValueInput.available() > 0) { serializedMigratedValueOutput.write(DELIMITER); } @@ -260,7 +268,8 @@ class RocksDBListState<K, N, V> extends AbstractRocksDBState<K, N, List<V>> .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer( (TypeSerializer<List<E>>) registerResult.f1.getStateSerializer()) - .setDefaultValue((List<E>) stateDesc.getDefaultValue()); + .setDefaultValue((List<E>) stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } static class StateSnapshotTransformerWrapper<T> implements StateSnapshotTransformer<byte[]> { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java index 20470c3a3dd..c811fc8caac 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMapState.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -225,25 +227,33 @@ class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, U DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<Map<UK, UV>> priorSerializer, - TypeSerializer<Map<UK, UV>> newSerializer) + TypeSerializer<Map<UK, UV>> newSerializer, + TtlTimeProvider ttlTimeProvider) throws StateMigrationException { - checkArgument(priorSerializer instanceof MapSerializer); - checkArgument(newSerializer instanceof MapSerializer); + checkArgument(priorSerializer instanceof TtlAwareSerializer.TtlAwareMapSerializer); + checkArgument(newSerializer instanceof TtlAwareSerializer.TtlAwareMapSerializer); - TypeSerializer<UV> priorMapValueSerializer = - ((MapSerializer<UK, UV>) priorSerializer).getValueSerializer(); - TypeSerializer<UV> newMapValueSerializer = - ((MapSerializer<UK, UV>) newSerializer).getValueSerializer(); + TtlAwareSerializer<UV, ?> priorTtlAwareMapValueSerializer = + ((TtlAwareSerializer.TtlAwareMapSerializer<UK, UV>) priorSerializer) + .getValueSerializer(); + TtlAwareSerializer<UV, ?> newTtlAwareMapValueSerializer = + ((TtlAwareSerializer.TtlAwareMapSerializer<UK, UV>) newSerializer) + .getValueSerializer(); try { boolean isNull = serializedOldValueInput.readBoolean(); - UV mapUserValue = null; - if (!isNull) { - mapUserValue = priorMapValueSerializer.deserialize(serializedOldValueInput); + serializedMigratedValueOutput.writeBoolean(isNull); + + if (isNull) { + newTtlAwareMapValueSerializer.serialize(null, serializedMigratedValueOutput); + } else { + newTtlAwareMapValueSerializer.migrateValueFromPriorSerializer( + priorTtlAwareMapValueSerializer, + () -> priorTtlAwareMapValueSerializer.deserialize(serializedOldValueInput), + serializedMigratedValueOutput, + ttlTimeProvider); } - serializedMigratedValueOutput.writeBoolean(mapUserValue == null); - newMapValueSerializer.serialize(mapUserValue, serializedMigratedValueOutput); } catch (Exception e) { throw new StateMigrationException( "Error while trying to migrate RocksDB map state.", e); @@ -726,7 +736,8 @@ class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, U .setValueSerializer( (TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer()) - .setDefaultValue((Map<UK, UV>) stateDesc.getDefaultValue()); + .setDefaultValue((Map<UK, UV>) stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java index 4cd9dff358d..0f47d9dafac 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOperationUtils.java @@ -275,7 +275,7 @@ public class RocksDBOperationUtils { .setMergeOperatorName(MERGE_OPERATOR_NAME); } - private static ColumnFamilyHandle createColumnFamily( + public static ColumnFamilyHandle createColumnFamily( ColumnFamilyDescriptor columnDescriptor, RocksDB db, List<ExportImportFilesMetaData> importFilesMetaData, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java index 3bcf21df9c8..464d55af26f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBReducingState.java @@ -182,6 +182,7 @@ class RocksDBReducingState<K, N, V> extends AbstractRocksDBAppendingState<K, N, .setReduceFunction( ((ReducingStateDescriptor<SV>) stateDesc).getReduceFunction()) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java index dfe27bf2eed..2484b831135 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBValueState.java @@ -134,6 +134,7 @@ class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V> ((RocksDBValueState<K, N, SV>) existingState) .setNamespaceSerializer(registerResult.f1.getNamespaceSerializer()) .setValueSerializer(registerResult.f1.getStateSerializer()) - .setDefaultValue(stateDesc.getDefaultValue()); + .setDefaultValue(stateDesc.getDefaultValue()) + .setColumnFamily(registerResult.f0); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java index c393a06c00f..69c7f5706c6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlAwareSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlUtils; import org.apache.flink.runtime.state.ttl.TtlValue; @@ -92,8 +92,7 @@ public class RocksDbTtlCompactFiltersManager { if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) { RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase; - if (TtlStateFactory.TtlSerializer.isTtlStateSerializer( - kvMetaInfoBase.getStateSerializer())) { + if (TtlAwareSerializer.isSerializerTtlEnabled(kvMetaInfoBase.getStateSerializer())) { createAndSetCompactFilterFactory(metaInfoBase.getName(), options); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java index 835d9ed0d59..8f94c7868b9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendMigrationTest.java @@ -18,21 +18,29 @@ package org.apache.flink.state.rocksdb; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; +import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -79,4 +87,88 @@ public class EmbeddedRocksDBStateBackendMigrationTest protected CheckpointStorage getCheckpointStorage() throws Exception { return storageSupplier.get(); } + + /// Todo: Move to StateBackendMigrationTestBase when all backends support ttl state migration + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor<TestType> initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + ListStateDescriptor<TestType> initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessListDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessMapDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } + + /// Todo: Move to StateBackendMigrationTestBase when all backends support ttl state migration + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor<TestType> initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + ListStateDescriptor<TestType> initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessListDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + initialAccessMapDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java index 6baa2c7e4a7..964b45f4e0d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendMigrationTest.java @@ -18,19 +18,27 @@ package org.apache.flink.state.rocksdb; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.TernaryBoolean; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -70,4 +78,88 @@ public class RocksDBStateBackendMigrationTest String checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); return new FileSystemCheckpointStorage(checkpointPath); } + + /// Todo: Move to StateBackendMigrationTestBase when all backends support ttl state migration + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor<TestType> initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + ListStateDescriptor<TestType> initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessListDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + newAccessMapDescriptorAfterRestore.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } + + /// Todo: Move to StateBackendMigrationTestBase when all backends support ttl state migration + @TestTemplate + protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception { + final String stateName = "test-ttl"; + + ValueStateDescriptor<TestType> initialAccessDescriptor = + new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore = + new ValueStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + ListStateDescriptor<TestType> initialAccessListDescriptor = + new ListStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer()); + initialAccessListDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + ListStateDescriptor<TestType> newAccessListDescriptorAfterRestore = + new ListStateDescriptor<>( + stateName, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + MapStateDescriptor<Integer, TestType> initialAccessMapDescriptor = + new MapStateDescriptor<>( + stateName, IntSerializer.INSTANCE, new TestType.V1TestTypeSerializer()); + initialAccessMapDescriptor.enableTimeToLive( + StateTtlConfig.newBuilder(Duration.ofDays(1)).build()); + MapStateDescriptor<Integer, TestType> newAccessMapDescriptorAfterRestore = + new MapStateDescriptor<>( + stateName, + IntSerializer.INSTANCE, + // restore with a V2 serializer that has a different schema + new TestType.V2TestTypeSerializer()); + + testKeyedValueStateUpgrade(initialAccessDescriptor, newAccessDescriptorAfterRestore); + testKeyedListStateUpgrade(initialAccessListDescriptor, newAccessListDescriptorAfterRestore); + testKeyedMapStateUpgrade(initialAccessMapDescriptor, newAccessMapDescriptorAfterRestore); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java index de5d0316c2a..17d05201eaa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/ttl/RocksDBTtlStateTestBase.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.runtime.state.ttl.MockTtlStateTest; import org.apache.flink.runtime.state.ttl.StateBackendTestContext; import org.apache.flink.runtime.state.ttl.TtlStateTestBase; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -41,6 +43,7 @@ import java.io.IOException; import java.nio.file.Path; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; /** Base test suite for rocksdb state TTL. */ public abstract class RocksDBTtlStateTestBase extends TtlStateTestBase { @@ -89,6 +92,24 @@ public abstract class RocksDBTtlStateTestBase extends TtlStateTestBase { testCompactFilter(false, false); } + /// Todo: Move to TtlStateTestBase when all backends support ttl state migration + @TestTemplate + @Override + protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { + assumeThat(this).isNotInstanceOf(MockTtlStateTest.class); + + initTest(); + + timeProvider.time = 0; + ctx().update(ctx().updateEmpty); + + KeyedStateHandle snapshot = sbetc.takeSnapshot(); + sbetc.createAndRestoreKeyedStateBackend(snapshot); + + sbetc.setCurrentKey("defaultKey"); + sbetc.createState(ctx().createStateDescriptor(), ""); + } + @TestTemplate public void testCompactFilterWithSnapshot() throws Exception { testCompactFilter(true, false);
