Hi Zakelly, Thanks for your feedback.
You're right - *resolveSchemaCompatibility* is critical for identifying schema compatibility. However, our challenge extends beyond detection to handling the actual migration process, particularly given RowData’s complex requirements. The standard migration logic in *AbstractRocksDBState* isn't sufficient for RowData because, during migration, we need to: - Add null values for newly added fields - Reorder fields based on field names in the new schema - Recursively handle nested structures - Apply different migration logic depending on the state type (e.g., *ListState* uses *migrateElement*(), *MapState* uses *migrateState*()) The current approach: *V value = priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, serializedMigratedValueOutput);* doesn’t offer enough control for these needs. The proposed *migrateState* and *migrateElement* methods maintain backward compatibility with default implementations, while enabling RowData to perform specialized migration logic without requiring backend changes. I’ve updated the proposal document to include pseudo-code examples of *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot* class to illustrate this. Let me know if I missed anything. Best regards, Weiqing On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi, Weiqing > > Thanks for the FLIP! In general I'd +1 for schema evolution for RowData > types, which will enhance the user experience of SQL jobs. > > I have one questions for now: > > You suggested introducing new methods in `TypeSerializerSnapshot`, but is > it possible to leverage existing state migration procedure[1], which also > performs deserialization and serialization with old and new serializer > correspondingly. IIUC, all we need is to properly implement > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since it > will be invoked here[3]. No need for new methods, right? > > > [1] > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > [2] > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > [3] > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > Best, > Zakelly > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <yangweiqing...@gmail.com> > wrote: > > > Hi all, > > > > I’d like to initiate a discussion about enhancing state schema evolution > > support for RowData in Flink. > > > > *Motivation* > > > > Flink applications frequently need to evolve their state schema as > business > > requirements change. Currently, when users update a Table API or SQL job > > with schema changes involving RowData types (particularly nested > > structures), they encounter serialization compatibility errors during > state > > restoration, causing job failures.The issue occurs because existing state > > migration mechanisms don't properly handle RowData types during schema > > evolution, preventing users from making backward-compatible changes like: > > > > - > > > > Adding nullable fields to existing structures > > - > > > > Reordering fields within a row while preserving field names > > - > > > > Evolving nested row structures > > > > This limitation impacts production applications using Flink's Table API, > as > > the RowData type is central to this interface. Users are forced to choose > > between maintaining outdated schemas or reprocessing all state data when > > schema changes are required. > > > > Here’s the proposal document: Link > > < > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > > Your feedback and ideas are welcome to refine this feature. > > > > Thanks, > > Weiqing > > >