Thanks for the suggestions, Zakelly! Regarding *migrateElement* - it is specifically needed for ListState, which stores elements individually with delimiters. Its implementation deserializes and processes each element one by one during migration, so I introduced the *migrateElement* API to handle this per-element processing.
Regarding the *migrateState *signature - I’m open to suggestions. My original design aimed to align with the existing implementations in RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For example, in RocksDBMapState (v1.20), the migrateSerializedValue <https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243> method first deserializes the old value and then serializes it with the new serializer: *... **if (!isNull) { ** mapUserValue = priorMapValueSerializer.deserialize(serializedOldValueInput); **} **... **newMapValueSerializer.serialize(mapUserValue, serializedMigratedValueOutput);* Here, *snapshotConfiguration.migrateState* is called as: *if (!isNull) {* *priorMapValueSerializer.snapshotConfiguration().migrateState(* *priorMapValueSerializer**, newSerializer, serializedOldValueInput, serializedMigratedValueOutput); * *}* The idea was to mirror this structure - delegate the migration logic to *priorSerializer.snapshotConfiguration(), *passing both the prior and new serializers. On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Weiqiang, > > Thanks for your answers! > > It seems a simple deserialization-serialization lacks flexibility, thus I'd > agree to introduce new methods. > I'd suggest changing the signature to: > ``` > public void migrateState( > TypeSerializerSnapshot<T> oldSerializerSnapshot, > DataInputDeserializer in, > DataOutputSerializer out) throws IOException > ``` > which is more aligned with other methods under `TypeSerializerSnapshot`. > WDYT? > > And another question: Could you describe in which case we need > `migrateElement`? > > > Best, > Zakelly > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com> > wrote: > > > Hi Zakelly, > > > > Thanks for your feedback. > > > > You're right - *resolveSchemaCompatibility* is critical for identifying > > schema compatibility. However, our challenge extends beyond detection to > > handling the actual migration process, particularly given RowData’s > complex > > requirements. > > > > The standard migration logic in *AbstractRocksDBState* isn't sufficient > for > > RowData because, during migration, we need to: > > > > - Add null values for newly added fields > > - Reorder fields based on field names in the new schema > > - Recursively handle nested structures > > - Apply different migration logic depending on the state type (e.g., > > *ListState* uses *migrateElement*(), *MapState* uses *migrateState*()) > > > > > > > > The current approach: > > > > > > *V value = > > > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > > serializedMigratedValueOutput);* > > > > doesn’t offer enough control for these needs. > > > > The proposed *migrateState* and *migrateElement* methods maintain > backward > > compatibility with default implementations, while enabling RowData to > > perform specialized migration logic without requiring backend changes. > > > > I’ve updated the proposal document to include pseudo-code examples of > > *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot* > > class to illustrate this. Let me know if I missed anything. > > > > Best regards, > > Weiqing > > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com> > wrote: > > > > > Hi, Weiqing > > > > > > Thanks for the FLIP! In general I'd +1 for schema evolution for RowData > > > types, which will enhance the user experience of SQL jobs. > > > > > > I have one questions for now: > > > > > > You suggested introducing new methods in `TypeSerializerSnapshot`, but > is > > > it possible to leverage existing state migration procedure[1], which > also > > > performs deserialization and serialization with old and new serializer > > > correspondingly. IIUC, all we need is to properly implement > > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since > it > > > will be invoked here[3]. No need for new methods, right? > > > > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > > > [2] > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > > > [3] > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > > > > > > > Best, > > > Zakelly > > > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <yangweiqing...@gmail.com > > > > > wrote: > > > > > > > Hi all, > > > > > > > > I’d like to initiate a discussion about enhancing state schema > > evolution > > > > support for RowData in Flink. > > > > > > > > *Motivation* > > > > > > > > Flink applications frequently need to evolve their state schema as > > > business > > > > requirements change. Currently, when users update a Table API or SQL > > job > > > > with schema changes involving RowData types (particularly nested > > > > structures), they encounter serialization compatibility errors during > > > state > > > > restoration, causing job failures.The issue occurs because existing > > state > > > > migration mechanisms don't properly handle RowData types during > schema > > > > evolution, preventing users from making backward-compatible changes > > like: > > > > > > > > - > > > > > > > > Adding nullable fields to existing structures > > > > - > > > > > > > > Reordering fields within a row while preserving field names > > > > - > > > > > > > > Evolving nested row structures > > > > > > > > This limitation impacts production applications using Flink's Table > > API, > > > as > > > > the RowData type is central to this interface. Users are forced to > > choose > > > > between maintaining outdated schemas or reprocessing all state data > > when > > > > schema changes are required. > > > > > > > > Here’s the proposal document: Link > > > > < > > > > > > > > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > > > > > > Your feedback and ideas are welcome to refine this feature. > > > > > > > > Thanks, > > > > Weiqing > > > > > > > > > >