FWIW, there is a mechanism in table/SQL[1] that can generate uids
based on the type of physical node, if we set it to "ALWAYS", then
it's not limited to compiled plan (maybe edited by users).

I haven't tried this in production yet, but I much like the idea of
keeping the uids of SQL jobs stable, even when the whole optimization
changes the DAG slightly, e.g. adding/missing a calc node. I've always
been looking and thinking about how to make it easier to keep the SQL
jobs state compatible (both the uid stability and state evolution),
after we tried with plan edition and hints set by users, I'll try to
give it a try with [1] to see it can be a better solution for uid
stability.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/config/#table-exec-uid-generation

Zhanghao Chen <zhanghao.c...@outlook.com> 于2024年2月8日周四 23:09写道:
>
> We're only concerned with parallelism tuning here (with the same Flink 
> version).  The plans will be compatible as long as the operator IDs keep the 
> same. Currently, this only holds if we do not break/create a chain, and we 
> want to make it hold when we break/create a chain as well. That's what the 
> FLIP is all about.
>
> The typical user story is that one has a job with a uniform parallelism in 
> the first place. The source is chained with an expensive operator. Later on, 
> the job parallelism needs to be increased, but the source can't due to limits 
> like Kafka partition number. The user then configures different parallelism 
> for the source and the remaining part of the job, which breaks a chain, and 
> leads to state-incompatibility.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Chesnay Schepler <ches...@apache.org>
> Sent: Thursday, February 8, 2024 18:12
> To: dev@flink.apache.org <dev@flink.apache.org>; Martijn Visser 
> <martijnvis...@apache.org>
> Cc: Yu Chen <yuchen.e...@gmail.com>
> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for 
> improved state compatibility on parallelism change
>
> How exactly are you tuning SQL jobs without compiled plans while
> ensuring that the resulting compiled plans are compatible? That's
> explicitly not supported by Flink, hence why CompiledPlans exist.
> If you change _anything_ the planner is free to generate a completely
> different plan, where you have no guarantees that you can map the state
> between one another.
>
> On 08/02/2024 09:42, Martijn Visser wrote:
> > Hi,
> >
> >> However, compiled plan is still too complicated for Flink newbies from my 
> >> point of view.
> > I don't think that the compiled plan was ever positioned to be a
> > simple solution. If you want to have an easy approach, we have a
> > declarative solution in place with SQL and/or the Table API imho.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Thu, Feb 8, 2024 at 9:14 AM Zhanghao Chen <zhanghao.c...@outlook.com> 
> > wrote:
> >> Hi Piotr,
> >>
> >> Thanks for the comment. I agree that compiled plan is the ultimate tool 
> >> for Flink SQL if one wants to make any changes to
> >> query later, and this FLIP indeed is not essential in this sense. However, 
> >> compiled plan is still too complicated for Flink newbies from my point of 
> >> view. As I mentioned previously, our internal platform provides a 
> >> visualized tool for editing the compiled plan but most users still find it 
> >> complex. Therefore, the FLIP can still benefit users with better 
> >> useability and the proposed changes are actually quite lightweight (just 
> >> copying a new hasher with 2 lines deleted + extending the OperatorIdPair 
> >> data structure) without much extra effort.
> >>
> >> Best,
> >> Zhanghao Chen
> >> ________________________________
> >> From: Piotr Nowojski <pnowoj...@apache.org>
> >> Sent: Thursday, February 8, 2024 14:50
> >> To: Zhanghao Chen <zhanghao.c...@outlook.com>
> >> Cc: Chesnay Schepler <ches...@apache.org>; dev@flink.apache.org 
> >> <dev@flink.apache.org>; Yu Chen <yuchen.e...@gmail.com>
> >> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation 
> >> for improved state compatibility on parallelism change
> >>
> >> Hey
> >>
> >>> AFAIK, there's no way to set UIDs for a SQL job,
> >> AFAIK you can't set UID manually, but  Flink SQL generates a compiled plan
> >> of a query with embedded UIDs. As I understand it, using a compiled plan is
> >> the preferred (only?) way for Flink SQL if one wants to make any changes to
> >> query later on or support Flink's runtime upgrades, without losing the
> >> state.
> >>
> >> If that's the case, what would be the usefulness of this FLIP? Only for
> >> DataStream API for users that didn't know that they should have manually
> >> configured UIDs? But they have the workaround to actually post-factum add
> >> the UIDs anyway, right? So maybe indeed Chesnay is right that this FLIP is
> >> not that helpful/worth the extra effort?
> >>
> >> Best,
> >> Piotrek
> >>
> >> czw., 8 lut 2024 o 03:55 Zhanghao Chen <zhanghao.c...@outlook.com>
> >> napisał(a):
> >>
> >>> Hi Chesnay,
> >>>
> >>> AFAIK, there's no way to set UIDs for a SQL job, it'll be great if you can
> >>> share how you allow UID setting for SQL jobs. We've explored providing a
> >>> visualized DAG editor for SQL jobs that allows UID setting on our internal
> >>> platform, but most users found it too complicated to use. Another
> >>> possible way is to utilize SQL hints, but that's complicated as well. From
> >>> our experience, many SQL users are not familiar with Flink, what they want
> >>> is an experience similar to writing a normal SQL in MySQL, without
> >>> involving much extra concepts like the DAG and the UID. In fact, some
> >>> DataStream and PyFlink users also share the same concern.
> >>>
> >>> On the other hand, some performance-tuning is inevitable for a
> >>> long-running jobs in production, and parallelism tuning is among the most
> >>> common techniques. FLIP-367 [1] and FLIP-146 [2] allow user to tune the
> >>> parallelism of source and sinks, and both are well-received in the
> >>> discussion thread. Users definitely don't want to lost state after a
> >>> parallelism tuning, which is highly risky at present.
> >>>
> >>> Putting these together, I think the FLIP has a high value in production.
> >>> Through offline discussion, I leant that multiple companies have developed
> >>> or trying to develop similar hasher changes in their internal 
> >>> distribution,
> >>> including ByteDance, Xiaohongshu, and Bilibili. It'll be great if we can
> >>> improve the SQL experience for all community users as well, WDYT?
> >>>
> >>> Best,
> >>> Zhanghao Chen
> >>> ------------------------------
> >>> *From:* Chesnay Schepler <ches...@apache.org>
> >>> *Sent:* Thursday, February 8, 2024 2:01
> >>> *To:* dev@flink.apache.org <dev@flink.apache.org>; Zhanghao Chen <
> >>> zhanghao.c...@outlook.com>; Piotr Nowojski <pnowoj...@apache.org>; Yu
> >>> Chen <yuchen.e...@gmail.com>
> >>> *Subject:* Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> >>> generation for improved state compatibility on parallelism change
> >>>
> >>> The FLIP is a bit weird to be honest. It only applies in cases where
> >>> users haven't set uids, but that goes against best-practices and as far
> >>> as I'm told SQL also sets UIDs everywhere.
> >>>
> >>> I'm wondering if this is really worth the effort.
> >>>
> >>> On 07/02/2024 10:23, Zhanghao Chen wrote:
> >>>> After offline discussion with @Yu Chen<mailto:yuchen.e...@gmail.com
> >>> <yuchen.e...@gmail.com>>, I've updated the FLIP [1] to include a design
> >>> that allows for compatible hasher upgrade by adding StreamGraphHasherV2 to
> >>> the legacy hasher list, which is actually a revival of the idea from
> >>> FLIP-5290 [2] when StreamGraphHasherV2 was introduced in Flink 1.2. We're
> >>> targeting to make V3 the default hasher in Flink 1.20 given that
> >>> state-compatibility is no longer an issue. Take a review when you have a
> >>> chance, and I'd like to especially thank @Yu Chen<
> >>> mailto:yuchen.e...@gmail.com <yuchen.e...@gmail.com>> for the through
> >>> offline discussion and code debugging help to make this possible.
> >>>> [1]
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
> >>>> [2] https://issues.apache.org/jira/browse/FLINK-5290
> >>>>
> >>>> Best,
> >>>> Zhanghao Chen
> >>>> ________________________________
> >>>> From: Zhanghao Chen <zhanghao.c...@outlook.com>
> >>>> Sent: Friday, January 12, 2024 10:46
> >>>> To: Piotr Nowojski <pnowoj...@apache.org>; Yu Chen <
> >>> yuchen.e...@gmail.com>
> >>>> Cc: dev@flink.apache.org <dev@flink.apache.org>
> >>>> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> >>> generation for improved state compatibility on parallelism change
> >>>> Thanks for the input, Piotr. It might still be possible to make it
> >>> compatible with the old snapshots, following the direction of FLINK-5290<
> >>> https://issues.apache.org/jira/browse/FLINK-5290> suggested by Yu. I'll
> >>> discuss with Yu on more details.
> >>>> Best,
> >>>> Zhanghao Chen
> >>>> ________________________________
> >>>> From: Piotr Nowojski <pnowoj...@apache.org>
> >>>> Sent: Friday, January 12, 2024 1:55
> >>>> To: Yu Chen <yuchen.e...@gmail.com>
> >>>> Cc: Zhanghao Chen <zhanghao.c...@outlook.com>; dev@flink.apache.org <
> >>> dev@flink.apache.org>
> >>>> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> >>> generation for improved state compatibility on parallelism change
> >>>> Hi,
> >>>>
> >>>> Using unaligned checkpoints is orthogonal to this FLIP.
> >>>>
> >>>> Yes, unaligned checkpoints are not supported for pointwise connections,
> >>> so most of the cases go away anyway.
> >>>> It is possible to switch from unchained to chained subtasks by removing
> >>> a keyBy exchange, and this would be
> >>>> a problem, but that's just one of the things that we claim that
> >>> unaligned checkpoints do not support [1]. But as
> >>>> I stated above, this is an orthogonal issue to this FLIP.
> >>>>
> >>>> Regarding the proposal itself, generally speaking it makes sense to me
> >>> as well. However I'm quite worried about
> >>>> the compatibility and/or migration path. The:
> >>>>> (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.
> >>>> step would break the compatibility with Flink 1.xx snapshots. But as
> >>> this is for v2.0, maybe that's not the end of
> >>>> the world?
> >>>>
> >>>> Best,
> >>>> Piotrek
> >>>>
> >>>> [1]
> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations
> >>>> czw., 11 sty 2024 o 12:10 Yu Chen <yuchen.e...@gmail.com<mailto:
> >>> yuchen.e...@gmail.com>> napisał(a):
> >>>> Hi Zhanghao,
> >>>>
> >>>> Actually, Stefan has done similar compatibility work in the early
> >>> FLINK-5290[1], where he introduced the legacyStreamGraphHashers list for
> >>> hasher backward compatibility.
> >>>> We have attempted to implement a similar feature in the internal version
> >>> of FLINK and tried to include the new hasher as part of the
> >>> legacyStreamGraphHashers,
> >>>> which would ensure that the corresponding Operator State could be found
> >>> at restore while ignoring the chaining condition(without changing the
> >>> default hasher).
> >>>> However, we have found that such a solution may lead to some unexpected
> >>> situations in some cases. While I have no time to find out the root cause
> >>> recently.
> >>>> If you're interested, I'd be happy to discuss it with you and try to
> >>> solve the problem.
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-5290
> >>>>
> >>>> Best,
> >>>> Yu Chen
> >>>>
> >>>>
> >>>>
> >>>> 2024年1月11日 15:07,Zhanghao Chen <zhanghao.c...@outlook.com<mailto:
> >>> zhanghao.c...@outlook.com>> 写道:
> >>>> Hi Yu,
> >>>>
> >>>> I haven't thought too much about the compatibility design before. By the
> >>> nature of the problem, it's impossible to make V3 compatible with V2, what
> >>> we can do is to somewhat better inform users when switching the hasher, 
> >>> but
> >>> I don't have any good idea so far. Do you have any suggestions on this?
> >>>> Best,
> >>>> Zhanghao Chen
> >>>> ________________________________
> >>>> From: Yu Chen <yuchen.e...@gmail.com<mailto:yuchen.e...@gmail.com>>
> >>>> Sent: Thursday, January 11, 2024 13:52
> >>>> To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> >>> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> >>>> Cc: Piotr Nowojski <piotr.nowoj...@gmail.com<mailto:
> >>> piotr.nowoj...@gmail.com>>; zhanghao.c...@outlook.com<mailto:
> >>> zhanghao.c...@outlook.com> <zhanghao.c...@outlook.com<mailto:
> >>> zhanghao.c...@outlook.com>>
> >>>> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> >>> generation for improved state compatibility on parallelism change
> >>>> Hi Zhanghao,
> >>>>
> >>>> Thanks for driving this, that’s really painful for us when we need to
> >>> switch config `pipeline.operator-chaining`.
> >>>> But I have a Concern, according to FLIP description, modifying
> >>> `isChainable` related code in `StreamGraphHasherV2` will cause the
> >>> generated operator id to be changed, which will result in the user unable
> >>> to recover from the old state (old and new Operator IDs can't be mapped).
> >>>> Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an
> >>> incompatibility, is there any relevant compatibility design considered?
> >>>> Best,
> >>>> Yu Chen
> >>>>
> >>>> 2024年1月10日 10:25,Zhanghao Chen <zhanghao.c...@outlook.com<mailto:
> >>> zhanghao.c...@outlook.com>> 写道:
> >>>> Hi David,
> >>>>
> >>>> Thanks for the comments. AFAIK, unaligned checkpoints are disabled for
> >>> pointwise connections according to [1], let's wait Piotr for confirmation.
> >>> The issue itself is not directly related to this proposal as well. If a
> >>> user manually specifies UIDs for each of the chained operators and has
> >>> unaligned checkpoints enabled, we will encounter the same issue if they
> >>> decide to break the chain on a later restart and try to recover from a
> >>> retained cp.
> >>>> [1]
> >>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/
> >>>>
> >>>> Best,
> >>>> Zhanghao Chen
> >>>> ________________________________
> >>>> From: David Morávek <d...@apache.org<mailto:d...@apache.org>>
> >>>> Sent: Wednesday, January 10, 2024 6:26
> >>>> To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> >>> dev@flink.apache.org<mailto:dev@flink.apache.org>>; Piotr Nowojski <
> >>> piotr.nowoj...@gmail.com<mailto:piotr.nowoj...@gmail.com>>
> >>>> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> >>> generation for improved state compatibility on parallelism change
> >>>> Hi Zhanghao,
> >>>>
> >>>> Thanks for the FLIP. What you're proposing makes a lot of sense +1
> >>>>
> >>>> Have you thought about how this works with unaligned checkpoints in case
> >>>> you go from unchained to chained? I think it should be fine because this
> >>>> scenario should only apply to forward/rebalance scenarios where we, as
> >>> far
> >>>> as I recall, force alignment anyway, so there should be no exchanges to
> >>>> snapshot. It might just work, but something to double-check. Maybe @Piotr
> >>>> Nowojski <piotr.nowoj...@gmail.com<mailto:piotr.nowoj...@gmail.com>>
> >>> could confirm it.
> >>>> Best,
> >>>> D.
> >>>>
> >>>> On Wed, Jan 3, 2024 at 7:10 AM Zhanghao Chen <zhanghao.c...@outlook.com
> >>> <mailto:zhanghao.c...@outlook.com>>
> >>>> wrote:
> >>>>
> >>>> Dear Flink devs,
> >>>>
> >>>> I'd like to start a discussion on FLIP 411: Chaining-agnostic Operator ID
> >>>> generation for improved state compatibility on parallelism change [1].
> >>>>
> >>>> Currently, when user does not explicitly set operator UIDs, the chaining
> >>>> behavior will still affect state compatibility, as the generation of the
> >>>> Operator ID is dependent on its chained output nodes. For example, a
> >>> simple
> >>>> source->sink DAG with source and sink chained together is state
> >>>> incompatible with an otherwise identical DAG with source and sink
> >>> unchained
> >>>> (either because the parallelisms of the two ops are changed to be unequal
> >>>> or chaining is disabled). This greatly limits the flexibility to perform
> >>>> chain-breaking/building for performance tuning.
> >>>>
> >>>> The dependency on chained output nodes for Operator ID generation can be
> >>>> traced back to Flink 1.2. It is unclear at this point on why chained
> >>> output
> >>>> nodes are involved in the algorithm, but the following history background
> >>>> might be related: prior to Flink 1.3, Flink runtime takes the snapshots
> >>> by
> >>>> the operator ID of the first vertex in a chain, so it somewhat makes
> >>> sense
> >>>> to include chained output nodes into the algorithm as
> >>>> chain-breaking/building is expected to break state-compatibility anyway.
> >>>>
> >>>> Given that operator-level state recovery within a chain has long been
> >>>> supported since Flink 1.3, I propose to introduce StreamGraphHasherV3
> >>> that
> >>>> is agnostic of the chaining behavior of operators, so that users are free
> >>>> to tune the parallelism of individual operators without worrying about
> >>>> state incompatibility. We can make the V3 hasher an optional choice in
> >>>> Flink 1.19, and make it the default hasher in 2.0 for backwards
> >>>> compatibility.
> >>>>
> >>>> Looking forward to your suggestions on it, thanks~
> >>>>
> >>>> [1]
> >>>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
> >>>> Best,
> >>>> Zhanghao Chen
> >>>>
> >>>
>


-- 

Best,
Benchao Li

Reply via email to