Hi Weiqiang, Thanks for your answers!
It seems a simple deserialization-serialization lacks flexibility, thus I'd agree to introduce new methods. I'd suggest changing the signature to: ``` public void migrateState( TypeSerializerSnapshot<T> oldSerializerSnapshot, DataInputDeserializer in, DataOutputSerializer out) throws IOException ``` which is more aligned with other methods under `TypeSerializerSnapshot`. WDYT? And another question: Could you describe in which case we need `migrateElement`? Best, Zakelly On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com> wrote: > Hi Zakelly, > > Thanks for your feedback. > > You're right - *resolveSchemaCompatibility* is critical for identifying > schema compatibility. However, our challenge extends beyond detection to > handling the actual migration process, particularly given RowData’s complex > requirements. > > The standard migration logic in *AbstractRocksDBState* isn't sufficient for > RowData because, during migration, we need to: > > - Add null values for newly added fields > - Reorder fields based on field names in the new schema > - Recursively handle nested structures > - Apply different migration logic depending on the state type (e.g., > *ListState* uses *migrateElement*(), *MapState* uses *migrateState*()) > > > > The current approach: > > > *V value = > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > serializedMigratedValueOutput);* > > doesn’t offer enough control for these needs. > > The proposed *migrateState* and *migrateElement* methods maintain backward > compatibility with default implementations, while enabling RowData to > perform specialized migration logic without requiring backend changes. > > I’ve updated the proposal document to include pseudo-code examples of > *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot* > class to illustrate this. Let me know if I missed anything. > > Best regards, > Weiqing > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com> wrote: > > > Hi, Weiqing > > > > Thanks for the FLIP! In general I'd +1 for schema evolution for RowData > > types, which will enhance the user experience of SQL jobs. > > > > I have one questions for now: > > > > You suggested introducing new methods in `TypeSerializerSnapshot`, but is > > it possible to leverage existing state migration procedure[1], which also > > performs deserialization and serialization with old and new serializer > > correspondingly. IIUC, all we need is to properly implement > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since it > > will be invoked here[3]. No need for new methods, right? > > > > > > [1] > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > > [2] > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > > [3] > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > > > > Best, > > Zakelly > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <yangweiqing...@gmail.com> > > wrote: > > > > > Hi all, > > > > > > I’d like to initiate a discussion about enhancing state schema > evolution > > > support for RowData in Flink. > > > > > > *Motivation* > > > > > > Flink applications frequently need to evolve their state schema as > > business > > > requirements change. Currently, when users update a Table API or SQL > job > > > with schema changes involving RowData types (particularly nested > > > structures), they encounter serialization compatibility errors during > > state > > > restoration, causing job failures.The issue occurs because existing > state > > > migration mechanisms don't properly handle RowData types during schema > > > evolution, preventing users from making backward-compatible changes > like: > > > > > > - > > > > > > Adding nullable fields to existing structures > > > - > > > > > > Reordering fields within a row while preserving field names > > > - > > > > > > Evolving nested row structures > > > > > > This limitation impacts production applications using Flink's Table > API, > > as > > > the RowData type is central to this interface. Users are forced to > choose > > > between maintaining outdated schemas or reprocessing all state data > when > > > schema changes are required. > > > > > > Here’s the proposal document: Link > > > < > > > > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > > > > Your feedback and ideas are welcome to refine this feature. > > > > > > Thanks, > > > Weiqing > > > > > >