Hi Weiqiang,

Thanks for your answers!

It seems a simple deserialization-serialization lacks flexibility, thus I'd
agree to introduce new methods.
I'd suggest changing the signature to:
```
 public void migrateState(
               TypeSerializerSnapshot<T> oldSerializerSnapshot,
               DataInputDeserializer in,
               DataOutputSerializer out) throws IOException
```
which is more aligned with other methods under `TypeSerializerSnapshot`.
WDYT?

And another question: Could you describe in which case we need
`migrateElement`?


Best,
Zakelly

On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com>
wrote:

> Hi Zakelly,
>
> Thanks for your feedback.
>
> You're right - *resolveSchemaCompatibility* is critical for identifying
> schema compatibility. However, our challenge extends beyond detection to
> handling the actual migration process, particularly given RowData’s complex
> requirements.
>
> The standard migration logic in *AbstractRocksDBState* isn't sufficient for
> RowData because, during migration, we need to:
>
>    - Add null values for newly added fields
>    - Reorder fields based on field names in the new schema
>    - Recursively handle nested structures
>    - Apply different migration logic depending on the state type (e.g.,
>    *ListState* uses *migrateElement*(), *MapState* uses *migrateState*())
>
>
>
> The current approach:
>
>
> *V value =
> priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value,
> serializedMigratedValueOutput);*
>
> doesn’t offer enough control for these needs.
>
> The proposed *migrateState* and *migrateElement* methods maintain backward
> compatibility with default implementations, while enabling RowData to
> perform specialized migration logic without requiring backend changes.
>
> I’ve updated the proposal document to include pseudo-code examples of
> *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot*
> class to illustrate this. Let me know if I missed anything.
>
> Best regards,
> Weiqing
>
> On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi, Weiqing
> >
> > Thanks for the FLIP! In general I'd +1 for schema evolution for RowData
> > types, which will enhance the user experience of SQL jobs.
> >
> > I have one questions for now:
> >
> > You suggested introducing new methods in `TypeSerializerSnapshot`, but is
> > it possible to leverage existing state migration procedure[1], which also
> > performs deserialization and serialization with old and new serializer
> > correspondingly. IIUC, all we need is to properly implement
> > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since it
> > will be invoked here[3]. No need for new methods, right?
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205
> > [2]
> >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335
> > [3]
> >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312
> >
> >
> > Best,
> > Zakelly
> >
> > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <yangweiqing...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I’d like to initiate a discussion about enhancing state schema
> evolution
> > > support for RowData in Flink.
> > >
> > > *Motivation*
> > >
> > > Flink applications frequently need to evolve their state schema as
> > business
> > > requirements change. Currently, when users update a Table API or SQL
> job
> > > with schema changes involving RowData types (particularly nested
> > > structures), they encounter serialization compatibility errors during
> > state
> > > restoration, causing job failures.The issue occurs because existing
> state
> > > migration mechanisms don't properly handle RowData types during schema
> > > evolution, preventing users from making backward-compatible changes
> like:
> > >
> > >    -
> > >
> > >    Adding nullable fields to existing structures
> > >    -
> > >
> > >    Reordering fields within a row while preserving field names
> > >    -
> > >
> > >    Evolving nested row structures
> > >
> > > This limitation impacts production applications using Flink's Table
> API,
> > as
> > > the RowData type is central to this interface. Users are forced to
> choose
> > > between maintaining outdated schemas or reprocessing all state data
> when
> > > schema changes are required.
> > >
> > > Here’s the proposal document: Link
> > > <
> > >
> >
> https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0
> > > >
> > > Your feedback and ideas are welcome to refine this feature.
> > >
> > > Thanks,
> > > Weiqing
> > >
> >
>

Reply via email to