Hi Shengkai, Hongshun, I’ve added one of our job examples here to help illustrate the schema evolution scenario in practice: Example Job <https://docs.google.com/document/d/1BizfC79gxa01bPJNjXhbShoMhDU7yZYIcEliVrD38Rw/edit?tab=t.0> (Note: Some SQL logic and schema details are redacted.)
@Shengkai I’ll make sure the finalized design is reflected in the cwiki. Would you recommend keeping the Google doc and cwiki in sync during the discussion phase, or is updating the cwiki after finalizing the proposal sufficient? On the opt-in configuration (state.schema-evolution.enable), I’m also re-evaluating whether it’s necessary. I’d love your input here: What would be the practical behavioral difference in your example job with vs. without the config? Thanks again, Weiqing On Wed, Jul 23, 2025 at 8:22 PM Hongshun Wang <[email protected]> wrote: > Hi Weiqing, > I like the idea. Would you give an example how a kafka or mysql connector > uses it to read data with different schemas for better understanding? > > Best, > Hongshun > > On Thu, Jul 24, 2025 at 10:34 AM Shengkai Fang <[email protected]> wrote: > > > Hi, Weiqing. > > > > Thanks for the update. It's better you can update the cwiki rather than > > google doc. > > > > After reading the doc, I just feel this feature is not applicable for > users > > because few users understand the sql operator state structure and in some > > cases, the operator state structure is releated to the input operator > > schema(e.g. Join Operator). My suggestion is the FLIP should focus on the > > problem that input schema changes and sql is unchanged. > > > > If we assume the sql is unchanged and input schema changes, in most > cases, > > planner will prune the unused columns. Could you give me an detailed > > example that name-based state migration works? > > > > Best, > > Shengkai > > > > Weiqing Yang <[email protected]> 于2025年7月23日周三 12:03写道: > > > > > Hi Shengkai, > > > > > > Thank you for your detailed feedback! > > > > > > I've updated the proposal to incorporate your suggestions: > > > > > > 1. fieldNames vs. originalRowType: I agree that using fieldNames: > > > String[] is a more focused and lightweight approach than storing the > > > full originalRowType. The proposal has been updated accordingly. > > > 2. Potential risks with name-based mapping: You raised a good point > > > regarding risks in scenarios such as SQL query changes on > aggregation. > > > To > > > mitigate this, I've introduced a configuration option > > > state.schema-evolution.enable (default: false), allowing users to > > > explicitly opt in when they’ve verified that schema evolution is > safe > > > for > > > their use case. > > > 3. Examples clarification: To better illustrate the boundary between > > > supported and unsupported changes, I’ve expanded the examples > section. > > > It > > > now clearly distinguishes supported cases (e.g., adding nullable > > fields, > > > reordering) from unsupported ones (e.g., removing fields, changing > > field > > > types). > > > 4. User perspective: I’ve also updated the proposal to clarify when > > > schema evolution is supported and when it isn’t, along with the > > expected > > > user workflow. This should help make the feature easier to > understand > > > and > > > adopt. > > > > > > Please let me know if you have any further suggestions or questions. > > > > > > Thanks again for your valuable input. > > > > > > Best, > > > Weiqing > > > > > > On Sun, Jul 20, 2025 at 10:14 PM Shengkai Fang <[email protected]> > > wrote: > > > > > > > Hi Weiqing. > > > > > > > > +1 for the FLIP. I have some suggestions about the FLIP: > > > > > > > > 1. Compared to adding a field named originalRowType in > > > `RowDataSerializer`, > > > > I prefer to add a field named fieldNames with type String[] . WDYT? I > > > think > > > > this field is used for name-based field mapping, so we just add the > > > > required names to the serializer. > > > > > > > > 2. This FLIP shows that if the state corresponding to an operator's > ID > > is > > > > retained in the state backend, the operator restores the state > > according > > > to > > > > name-based field mapping. But I think field-named mapping is > dangerous > > in > > > > some cases. For example, users have an aggregation > Aggregate(groupBy(a, > > > b), > > > > SUM(c)) in its job. After modification, its job has a modified > > > > Aggregate(groupBy(a,b), SUM(d), SUM(c)). In this case, SUM(d) agg > > buffer > > > > name is agg0_max and SUM(c) is agg1_max after modification but > SUM(c)'s > > > agg > > > > buffer name is agg0_max. How about introducing an option to let users > > > > determine whether to enable schema migration? > > > > > > > > 3. The example in the FLIP is not very good because the current > > proposal > > > > still can not migrate state if type changes? Because I see the FLIP > > > writes > > > > "Incompatible changes (removing fields, changing field types) will > > still > > > be > > > > rejected with clear error messages". > > > > > > > > 4. It is better to clarify when schema migration works and when it > does > > > not > > > > from the user's perspective. The current proposal mainly focuses on > the > > > > operator perspective, which may not be user-friendly. > > > > > > > > Best, > > > > Shengkai > > > > > > > > Weiqing Yang <[email protected]> 于2025年7月18日周五 14:42写道: > > > > > > > > > Hi Zakelly and Hangxiang, > > > > > > > > > > Just checking in - do you have any concerns or feedback? > > > > > > > > > > If there are no further objections from anyone, I’ll mark the FLIP > as > > > > ready > > > > > for voting. > > > > > > > > > > > > > > > Best, > > > > > Weiqing > > > > > > > > > > On Sun, Jul 6, 2025 at 11:46 PM Weiqing Yang < > > [email protected] > > > > > > > > > wrote: > > > > > > > > > > > Hi Hangxiang, Zakelly, > > > > > > > > > > > > Thank you for the careful review and the +1 on the proposal. > > > > > > > > > > > > *1. Where to host the migration logic* > > > > > > > > > > > > I experimented with placing the migration hook on > > > > > > TypeSerializerSchemaCompatibility, but ran into two issues: > > > > > > > > > > > > - > > > > > > > > > > > > Is the "schemaEvolutionSerializer" intended to be the new > > > > > > TypeSerializer? The migration needs access to both the > > > > > > DataInputDeserializer (the value) and the new TypeSerializer. > > > > > > - > > > > > > > > > > > > TypeSerializerSchemaCompatibility is currently designed as a > > > result > > > > > > holder, not an executor, so keeping the procedural logic > inside > > > > > > TypeSerializerSnapshot seems clearer. > > > > > > > > > > > > *2. Naming the snapshot field* > > > > > > > > > > > > I can change the field to `oldSerializerSnapshot` for consistency > > > with > > > > ` > > > > > > resolveSchemaCompatibility()`, if you think that’s clearer. Note > > that > > > > > > migrateState() will still require the new serializer, so the > method > > > > > > signature will remain migrateState(oldSnapshot, newSerializer, > > > > dataInput, > > > > > > ...). > > > > > > > > > > > > *3. Need for migrateElement()* > > > > > > > > > > > > I initially tried relying only on migrateState(), but for > > > > > RocksDBListState > > > > > > the code became much less clean, as it stores list elements > > > > individually > > > > > > with delimiters. A dedicated migrateElement() method keeps that > > > > migration > > > > > > logic more readable (also slightly improves performance) for > > > ListState. > > > > > > > > > > > > For context, I cherry-picked our internal PR (used in production > on > > > > Flink > > > > > > v1.16) that illustrates these points in practice: > > > > > > > > > > > > > > > > > > > > > https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842 > > > > > > > > > > > > I’m happy to iterate further - please let me know your thoughts. > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > Weiqing > > > > > > > > > > > > On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu < > [email protected]> > > > > > wrote: > > > > > > > > > > > >> Hi, Weiqing. > > > > > >> Thanks for driving this FLIP. > > > > > >> I'm +1 for supporting schema evolution for SQL RowData type. > > > > > >> > > > > > >> I just have some questions: > > > > > >> 1. Could we consider defining a method returning > > > > > >> *SchemaEvolutionSerializer* > > > > > >> in *TypeSerializerSchemaCompatibility* (like > > > > > >> compatibleAfterMigration(TypeSerializer<T> > > > > schemaEvolutionSerializer))? > > > > > >> Then we could also only care about implementing the schema > > evolution > > > > as > > > > > >> the > > > > > >> format of serializer (which could also be automic support since > we > > > > > >> implement it inside and call it in the internal state). > > > > > >> I think it may be better because *TypeSerializerSnapshot* is a > > > common > > > > > >> interface and many of its implementations may not really need > > > > > >> *migrateState* > > > > > >> . > > > > > >> > > > > > >> 2. Considering the semantics of *TypeSerializerSnapshot*, I'd > also > > > > > suggest > > > > > >> changing the field to *oldSerializerSnapshot *which is also > > > consistent > > > > > >> with > > > > > >> *resolveSchemaCompatibility* > > > > > >> > > > > > >> *3. *Do we really need an extra *migrateElement* method ? Or if > we > > > go > > > > > with > > > > > >> the option of defining *SchemaEvolutionSerializer, *Could > element > > > > schema > > > > > >> evolution serializer just be a special > *SchemaEvolutionSerializer > > ?* > > > > > >> > > > > > >> On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang < > > > > [email protected]> > > > > > >> wrote: > > > > > >> > > > > > >> > Thanks for the suggestions, Zakelly! > > > > > >> > > > > > > >> > Regarding *migrateElement* - it is specifically needed for > > > > ListState, > > > > > >> which > > > > > >> > stores elements individually with delimiters. Its > implementation > > > > > >> > deserializes and processes each element one by one during > > > migration, > > > > > so > > > > > >> I > > > > > >> > introduced the *migrateElement* API to handle this per-element > > > > > >> processing. > > > > > >> > > > > > > >> > Regarding the *migrateState *signature - I’m open to > > suggestions. > > > My > > > > > >> > original design aimed to align with the existing > implementations > > > in > > > > > >> > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. > For > > > > > >> example, > > > > > >> > in RocksDBMapState (v1.20), the migrateSerializedValue > > > > > >> > < > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243 > > > > > >> > > > > > > > >> > method first deserializes the old value and then serializes it > > > with > > > > > the > > > > > >> new > > > > > >> > serializer: > > > > > >> > > > > > > >> > *... > > > > > >> > **if (!isNull) { > > > > > >> > ** mapUserValue = > > > > > >> > priorMapValueSerializer.deserialize(serializedOldValueInput); > > > > > >> > **} > > > > > >> > **... > > > > > >> > **newMapValueSerializer.serialize(mapUserValue, > > > > > >> > serializedMigratedValueOutput);* > > > > > >> > > > > > > >> > Here, *snapshotConfiguration.migrateState* is called as: > > > > > >> > > > > > > >> > *if (!isNull) {* > > > > > >> > > > > *priorMapValueSerializer.snapshotConfiguration().migrateState(* > > > > > >> > *priorMapValueSerializer**, newSerializer, > > > > > >> > serializedOldValueInput, serializedMigratedValueOutput); > > > > > >> > * > > > > > >> > *}* > > > > > >> > > > > > > >> > The idea was to mirror this structure - delegate the migration > > > logic > > > > > >> > to *priorSerializer.snapshotConfiguration(), > > > > > >> > *passing both the prior and new serializers. > > > > > >> > > > > > > >> > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan < > > > [email protected]> > > > > > >> wrote: > > > > > >> > > > > > > >> > > Hi Weiqiang, > > > > > >> > > > > > > > >> > > Thanks for your answers! > > > > > >> > > > > > > > >> > > It seems a simple deserialization-serialization lacks > > > flexibility, > > > > > >> thus > > > > > >> > I'd > > > > > >> > > agree to introduce new methods. > > > > > >> > > I'd suggest changing the signature to: > > > > > >> > > ``` > > > > > >> > > public void migrateState( > > > > > >> > > TypeSerializerSnapshot<T> > > oldSerializerSnapshot, > > > > > >> > > DataInputDeserializer in, > > > > > >> > > DataOutputSerializer out) throws IOException > > > > > >> > > ``` > > > > > >> > > which is more aligned with other methods under > > > > > >> `TypeSerializerSnapshot`. > > > > > >> > > WDYT? > > > > > >> > > > > > > > >> > > And another question: Could you describe in which case we > need > > > > > >> > > `migrateElement`? > > > > > >> > > > > > > > >> > > > > > > > >> > > Best, > > > > > >> > > Zakelly > > > > > >> > > > > > > > >> > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang < > > > > > >> [email protected]> > > > > > >> > > wrote: > > > > > >> > > > > > > > >> > > > Hi Zakelly, > > > > > >> > > > > > > > > >> > > > Thanks for your feedback. > > > > > >> > > > > > > > > >> > > > You're right - *resolveSchemaCompatibility* is critical > for > > > > > >> identifying > > > > > >> > > > schema compatibility. However, our challenge extends > beyond > > > > > >> detection > > > > > >> > to > > > > > >> > > > handling the actual migration process, particularly given > > > > > RowData’s > > > > > >> > > complex > > > > > >> > > > requirements. > > > > > >> > > > > > > > > >> > > > The standard migration logic in *AbstractRocksDBState* > isn't > > > > > >> sufficient > > > > > >> > > for > > > > > >> > > > RowData because, during migration, we need to: > > > > > >> > > > > > > > > >> > > > - Add null values for newly added fields > > > > > >> > > > - Reorder fields based on field names in the new schema > > > > > >> > > > - Recursively handle nested structures > > > > > >> > > > - Apply different migration logic depending on the > state > > > type > > > > > >> (e.g., > > > > > >> > > > *ListState* uses *migrateElement*(), *MapState* uses > > > > > >> > *migrateState*()) > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > The current approach: > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > *V value = > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > > > > > >> > > > serializedMigratedValueOutput);* > > > > > >> > > > > > > > > >> > > > doesn’t offer enough control for these needs. > > > > > >> > > > > > > > > >> > > > The proposed *migrateState* and *migrateElement* methods > > > > maintain > > > > > >> > > backward > > > > > >> > > > compatibility with default implementations, while enabling > > > > RowData > > > > > >> to > > > > > >> > > > perform specialized migration logic without requiring > > backend > > > > > >> changes. > > > > > >> > > > > > > > > >> > > > I’ve updated the proposal document to include pseudo-code > > > > examples > > > > > >> of > > > > > >> > > > *migrateState* and *migrateElement* in the > > > > > >> *RowDataSerializerSnapshot* > > > > > >> > > > class to illustrate this. Let me know if I missed > anything. > > > > > >> > > > > > > > > >> > > > Best regards, > > > > > >> > > > Weiqing > > > > > >> > > > > > > > > >> > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan < > > > > > [email protected]> > > > > > >> > > wrote: > > > > > >> > > > > > > > > >> > > > > Hi, Weiqing > > > > > >> > > > > > > > > > >> > > > > Thanks for the FLIP! In general I'd +1 for schema > > evolution > > > > for > > > > > >> > RowData > > > > > >> > > > > types, which will enhance the user experience of SQL > jobs. > > > > > >> > > > > > > > > > >> > > > > I have one questions for now: > > > > > >> > > > > > > > > > >> > > > > You suggested introducing new methods in > > > > > `TypeSerializerSnapshot`, > > > > > >> > but > > > > > >> > > is > > > > > >> > > > > it possible to leverage existing state migration > > > procedure[1], > > > > > >> which > > > > > >> > > also > > > > > >> > > > > performs deserialization and serialization with old and > > new > > > > > >> > serializer > > > > > >> > > > > correspondingly. IIUC, all we need is to properly > > implement > > > > > >> > > > > `resolveSchemaCompatibility` for > > > > `RowDataSerializerSnapshot`[2] > > > > > >> since > > > > > >> > > it > > > > > >> > > > > will be invoked here[3]. No need for new methods, right? > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > [1] > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > > > > > >> > > > > [2] > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > > > > > >> > > > > [3] > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > Best, > > > > > >> > > > > Zakelly > > > > > >> > > > > > > > > > >> > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang < > > > > > >> > [email protected] > > > > > >> > > > > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi all, > > > > > >> > > > > > > > > > > >> > > > > > I’d like to initiate a discussion about enhancing > state > > > > schema > > > > > >> > > > evolution > > > > > >> > > > > > support for RowData in Flink. > > > > > >> > > > > > > > > > > >> > > > > > *Motivation* > > > > > >> > > > > > > > > > > >> > > > > > Flink applications frequently need to evolve their > state > > > > > schema > > > > > >> as > > > > > >> > > > > business > > > > > >> > > > > > requirements change. Currently, when users update a > > Table > > > > API > > > > > or > > > > > >> > SQL > > > > > >> > > > job > > > > > >> > > > > > with schema changes involving RowData types > > (particularly > > > > > nested > > > > > >> > > > > > structures), they encounter serialization > compatibility > > > > errors > > > > > >> > during > > > > > >> > > > > state > > > > > >> > > > > > restoration, causing job failures.The issue occurs > > because > > > > > >> existing > > > > > >> > > > state > > > > > >> > > > > > migration mechanisms don't properly handle RowData > types > > > > > during > > > > > >> > > schema > > > > > >> > > > > > evolution, preventing users from making > > > backward-compatible > > > > > >> changes > > > > > >> > > > like: > > > > > >> > > > > > > > > > > >> > > > > > - > > > > > >> > > > > > > > > > > >> > > > > > Adding nullable fields to existing structures > > > > > >> > > > > > - > > > > > >> > > > > > > > > > > >> > > > > > Reordering fields within a row while preserving > field > > > > names > > > > > >> > > > > > - > > > > > >> > > > > > > > > > > >> > > > > > Evolving nested row structures > > > > > >> > > > > > > > > > > >> > > > > > This limitation impacts production applications using > > > > Flink's > > > > > >> Table > > > > > >> > > > API, > > > > > >> > > > > as > > > > > >> > > > > > the RowData type is central to this interface. Users > are > > > > > forced > > > > > >> to > > > > > >> > > > choose > > > > > >> > > > > > between maintaining outdated schemas or reprocessing > all > > > > state > > > > > >> data > > > > > >> > > > when > > > > > >> > > > > > schema changes are required. > > > > > >> > > > > > > > > > > >> > > > > > Here’s the proposal document: Link > > > > > >> > > > > > < > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > > >> > > > > > > > > > > > >> > > > > > Your feedback and ideas are welcome to refine this > > > feature. > > > > > >> > > > > > > > > > > >> > > > > > Thanks, > > > > > >> > > > > > Weiqing > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Best, > > > > > >> Hangxiang. > > > > > >> > > > > > > > > > > > > > > > > > > > > >
