Thanks for the suggestions, Zakelly!

Regarding *migrateElement* - it is specifically needed for ListState, which
stores elements individually with delimiters. Its implementation
deserializes and processes each element one by one during migration, so I
introduced the *migrateElement* API to handle this per-element processing.

Regarding the *migrateState *signature - I’m open to suggestions. My
original design aimed to align with the existing implementations in
RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For example,
in RocksDBMapState (v1.20), the migrateSerializedValue
<https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243>
method first deserializes the old value and then serializes it with the new
serializer:

*...
**if (!isNull) {
**    mapUserValue =
priorMapValueSerializer.deserialize(serializedOldValueInput);
**}
**...
**newMapValueSerializer.serialize(mapUserValue, serializedMigratedValueOutput);*

Here, *snapshotConfiguration.migrateState* is called as:

*if (!isNull) {*
    *priorMapValueSerializer.snapshotConfiguration().migrateState(*
           *priorMapValueSerializer**, newSerializer,
serializedOldValueInput, serializedMigratedValueOutput);
*
*}*

The idea was to mirror this structure - delegate the migration logic
to *priorSerializer.snapshotConfiguration(),
*passing both the prior and new serializers.

On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Weiqiang,
>
> Thanks for your answers!
>
> It seems a simple deserialization-serialization lacks flexibility, thus I'd
> agree to introduce new methods.
> I'd suggest changing the signature to:
> ```
>  public void migrateState(
>                TypeSerializerSnapshot<T> oldSerializerSnapshot,
>                DataInputDeserializer in,
>                DataOutputSerializer out) throws IOException
> ```
> which is more aligned with other methods under `TypeSerializerSnapshot`.
> WDYT?
>
> And another question: Could you describe in which case we need
> `migrateElement`?
>
>
> Best,
> Zakelly
>
> On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com>
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for your feedback.
> >
> > You're right - *resolveSchemaCompatibility* is critical for identifying
> > schema compatibility. However, our challenge extends beyond detection to
> > handling the actual migration process, particularly given RowData’s
> complex
> > requirements.
> >
> > The standard migration logic in *AbstractRocksDBState* isn't sufficient
> for
> > RowData because, during migration, we need to:
> >
> >    - Add null values for newly added fields
> >    - Reorder fields based on field names in the new schema
> >    - Recursively handle nested structures
> >    - Apply different migration logic depending on the state type (e.g.,
> >    *ListState* uses *migrateElement*(), *MapState* uses *migrateState*())
> >
> >
> >
> > The current approach:
> >
> >
> > *V value =
> >
> priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value,
> > serializedMigratedValueOutput);*
> >
> > doesn’t offer enough control for these needs.
> >
> > The proposed *migrateState* and *migrateElement* methods maintain
> backward
> > compatibility with default implementations, while enabling RowData to
> > perform specialized migration logic without requiring backend changes.
> >
> > I’ve updated the proposal document to include pseudo-code examples of
> > *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot*
> > class to illustrate this. Let me know if I missed anything.
> >
> > Best regards,
> > Weiqing
> >
> > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> >
> > > Hi, Weiqing
> > >
> > > Thanks for the FLIP! In general I'd +1 for schema evolution for RowData
> > > types, which will enhance the user experience of SQL jobs.
> > >
> > > I have one questions for now:
> > >
> > > You suggested introducing new methods in `TypeSerializerSnapshot`, but
> is
> > > it possible to leverage existing state migration procedure[1], which
> also
> > > performs deserialization and serialization with old and new serializer
> > > correspondingly. IIUC, all we need is to properly implement
> > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since
> it
> > > will be invoked here[3]. No need for new methods, right?
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335
> > > [3]
> > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <yangweiqing...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I’d like to initiate a discussion about enhancing state schema
> > evolution
> > > > support for RowData in Flink.
> > > >
> > > > *Motivation*
> > > >
> > > > Flink applications frequently need to evolve their state schema as
> > > business
> > > > requirements change. Currently, when users update a Table API or SQL
> > job
> > > > with schema changes involving RowData types (particularly nested
> > > > structures), they encounter serialization compatibility errors during
> > > state
> > > > restoration, causing job failures.The issue occurs because existing
> > state
> > > > migration mechanisms don't properly handle RowData types during
> schema
> > > > evolution, preventing users from making backward-compatible changes
> > like:
> > > >
> > > >    -
> > > >
> > > >    Adding nullable fields to existing structures
> > > >    -
> > > >
> > > >    Reordering fields within a row while preserving field names
> > > >    -
> > > >
> > > >    Evolving nested row structures
> > > >
> > > > This limitation impacts production applications using Flink's Table
> > API,
> > > as
> > > > the RowData type is central to this interface. Users are forced to
> > choose
> > > > between maintaining outdated schemas or reprocessing all state data
> > when
> > > > schema changes are required.
> > > >
> > > > Here’s the proposal document: Link
> > > > <
> > > >
> > >
> >
> https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0
> > > > >
> > > > Your feedback and ideas are welcome to refine this feature.
> > > >
> > > > Thanks,
> > > > Weiqing
> > > >
> > >
> >
>

Reply via email to