Hi Gyula,

Thanks for your reply!


Let me answer these questions:


> What is the semantics of the usesStates method? When is it called? Can
the used state change dynamically at runtime? Can the logic depend on
something computed in open(..) for example?


useStates is used to predefine all the states that the process
function needs to access. In other words, we want to avoid declaring
the state dynamically at runtime and this allows the SQL planner and
JM to optimize the job better. As a result, this logic must be fully
available at compile time (when the JobGraph is generated), so it
can't rely on computations that are executed after deploy to TM.


>
Currently state access is pretty dynamic in Flink and I would assume
many jobs create states on the fly based on some required logic. Are
we planning to address these use-cases?


It depends on what type of context we need. If the type and number of
states depend on runtime context, that's something we want to avoid.
If it only depended on information available at compile time, I think
we could support
it.


>
Are we planning to support deleting/dropping states that are not
required anymore?


We really don't want the user to be able to dynamically declare/delete
a state at runtime, but if you just want to clear/clean the value of
state, the new API works the same as the old API.


> I think if a state is not declared or otherwise cannot be accessed, an
exceptions must be thrown. We cannot confuse empty value with something
inaccessible.


After thinking about it a bit more, I think you have a point!
It's important to make a clear distinction between an empty state and
illegal access, especially since flink currently discourage setting a
non-null default value for the state.
I will modify the proposal as you suggested then :)


> The RedistributionMode enum sounds a bit strange to me, as it doesn't
actually specify a mode of redistribution. It feels more like a flag. Can
we simply have an Optional<RedistributionStrategy> instead?


We actually define three types RedistributionMode instead of two because we
don't want to think of IDENTICAL as a redistribution strategy, it's just an
invariant: the State of that type is always the same across partitions. If
it only has None and REDISTRIBUTABLE, I think your proposal is feasible
then. But we don't want to confuse these three semantics/modes.


> BroadcastStates are currently very limited by only Map-like states, and
the new interface also enforces that. Can we remove this limitation? If
not, should broadcastState declaration extend mapstate declaration?


Personally, I don't want to make this restriction. This is also why
the method in StateManager to get BroadcastState has the parameter of
BroadcastStateDeclaration instead of MapStateDeclaration. In the
future, if the state backend supports other types of broadcast state,
we can add a corresponding method to the States utility class to get
the BroadcastSateDeclaration.



Best regards,

Weijie


Hangxiang Yu <master...@gmail.com> 于2024年3月7日周四 11:55写道:

> Hi, Weijie.
> Thanks for your proposal.
> I'd like to start the discussion with some questions:
> 1. We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> operation to avoid creating state when processElement. Could current
> interfaces also help this?
>
> 2. Could you provide more examples about how useStates() works ? Since some
> operations may change their used states at runtime, the value this method
> returns will be modified at runtime, right ?
> If so, I'm thinking if we could get some deterministic State Declaration
> Set before running which could help a lot for some state operations e.g.
> pre-check schema compatibility, queryable schema.
>
> 3. IIUC, RedistributionMode/Strategy should not be used by users, right ?
> If so, I'm +1 to move them to inner interfaces which seems a bit confusing
> to users.
>
> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi Weijie,
> >
> > Thanks for proposing this!
> >
> > Unifying and optimizing state definitions is a very good thing. I like
> the
> > idea of 'definition goes before using', so overall +1 for this proposal.
> >
> > However, I think the current definition is somewhat unclear. From a
> user's
> > point of view, I believe that state can be characterized along two
> > relatively independent axes: the scenario (keyed, non-keyed, or
> broadcast)
> > and the data structure (single value, list, map). I recommend that we
> fully
> > decouple these aspects, rather than linking the nature of the definition
> to
> > specific assumptions, such as equating broadcast states with maps, or
> > considering list states could be non-keyed.
> > Furthermore, the concept of 'Redistribution' may impose a cognitive
> burden
> > on general users. My advice would be to conceal
> RedistributionMode/Strategy
> > from the standard user interface, particularly within the helper class
> > 'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
> > are basically used by the framework. My preferred syntax would be:
> > ```
> > StateDeclaration a = State.declare(name).keyed().listState(type);
> > StateDeclaration b = State.declare(name).broadcast().mapState(typeK,
> > typeV);
> > StateDeclaration c = State.declare(name).keyed().aggregatingState(type,
> > function);
> > ```
> > WDYT?
> >
> >
> > Best,
> > Zakelly
> >
> > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > > Hi Weijie!
> > >
> > > Thank you for the proposal.
> > >
> > > I have some initial questions to start the discussion:
> > >
> > > 1. What is the semantics of the usesStates method? When is it called?
> Can
> > > the used state change dynamically at runtime? Can the logic depend on
> > > something computed in open(..) for example?
> > >
> > > Currently state access is pretty dynamic in Flink and I would assume
> many
> > > jobs create states on the fly based on some required logic. Are we
> > planning
> > > to address these use-cases?
> > >
> > > Are we planning to support deleting/dropping states that are not
> required
> > > anymore?
> > >
> > > 2. Get state now returns an optional, but you mention that:
> > > " If you want to get a state that is not declared or has no access,
> > > Option#empty is returned."
> > >
> > > I think if a state is not declared or otherwise cannot be accessed, an
> > > exceptions must be thrown. We cannot confuse empty value with something
> > > inaccessible.
> > >
> > > 3. The RedistributionMode enum sounds a bit strange to me, as it
> doesn't
> > > actually specify a mode of redistribution. It feels more like a flag.
> Can
> > > we simply have an Optional<RedistributionStrategy> instead?
> > >
> > > 4. BroadcastStates are currently very limited by only Map-like states,
> > and
> > > the new interface also enforces that.
> > > Can we remove this limitation? If not, should broadcastState
> declaration
> > > extend mapstate declaration?
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Cheers
> > > Gyuka
> > >
> > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo <guoweijieres...@gmail.com>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion about FLIP-433: State Access on
> > > > DataStream API V2
> > > > [1]. This is the third sub-FLIP of DataStream API V2.
> > > >
> > > >
> > > > After FLIP-410 [2], we can already write a simple stateless job using
> > the
> > > > DataStream V2 API.  But as we all know, stateful computing is Flink's
> > > trump
> > > > card. In this FLIP, we will discuss how to declare and access state
> on
> > > > DataStream API V2 and we manage to avoid some of the shortcomings of
> V1
> > > in
> > > > this regard.
> > > >
> > > > You can find more details in this FLIP. Its relationship with other
> > > > sub-FLIPs can be found in the umbrella FLIP
> > > > [3]. Looking forward to hearing from you, thanks!
> > > >
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> > > >
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> > > >
> > > > [3]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> > > >
> > >
> >
>
>
> --
> Best,
> Hangxiang.
>

Reply via email to