Hi Guozhang,

Sorry I haven't had the time to respond to your earlier email, but I just
wanted to clarify something with respect to your most recent email.

My original plan in option A is to remove the entire Task ID from the State
Store path, which would insulate it from any changes to the Task ID format
introduced by Named Topologies or anything else. This would in fact
consolidate the store for the instance, rather than by-Task (which I think
is what you meant by "one physical store per state"?).

I did highlight in option C the possibility of changing the format of the
Task ID to change the sub-topology ID from an ordinal to a stable
identifier. Although I'm not convinced that this option is viable, or even
desirable.

Regards,

Nick

On Sat, 12 Feb 2022 at 00:36, Guozhang Wang <wangg...@gmail.com> wrote:

> Just to follow-up on this thread, I had another chat with John regarding
> option a) and I think the key thought is that, today the task-id is in the
> form of [sub-topologyID]-[partitionID] --- and in the future with
> named-topology it could be extended to three digits as
> [named-topologyID]-[sub-topologyID]-[partitionID] --- and for the purpose
> of this KIP's option A), we actually just want to remove the
> [sub-topologyID] from the taskID as part of the file path hierarchy, right?
>
> If yes, given that in the future we want:
>
> * allow topology evolution with compatibility validations.
> * consolidating persistent state stores so that we do not have one physical
> store per state, but potentially one store for the whole instance.
>
> No matter if we want to provide certain tooling for mapping the persistent
> state path / names as in option B), pursuing some solutions in the
> direction of option A) to be independent of the sub-topologyID since state
> store names within a topology should be sufficiently unique would make a
> lot of sense.
>
>
> On Mon, Feb 7, 2022 at 3:52 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Nick,
> >
> > I think I'm on the same page of the scope of your KIP, and what I was
> > trying to get is that, there are some other efforts going on in parallel
> > that tries to identify if two topologies, or some part of them, are
> > isomorphic in structure, and hence their corresponding persistent states
> > may be reusable. That's why I was saying that "assume, we know which
> > persistent states can be reusable". i.e. let's say we know that the new
> > topology's sub-topology 1's state store A-0005 is the same as the old
> > topology's sub-topology 0' state store A-0004, then what we can do to let
> > the new topology state store to be loaded as the old state store. With
> that
> > in my mind originally, I said maybe option B) is sufficient to rename the
> > dir path / state store names before we start the new app's topology. But
> > that's just one aspect of it and we do not necessarily need to follow :)
> If
> > you are up to do a prototype following option A) with a good upgrade
> path,
> > it would be a great solution too.
> >
> >
> > On Mon, Feb 7, 2022 at 8:59 AM John Roesler <vvcep...@apache.org> wrote:
> >
> >> Thanks, Nick,
> >>
> >> It sounds like we're on the same page. I didn't think (A)
> >> would be fundamentally "hard", just that it might be a pain
> >> in practice. Reading your response, if you're up for it, it
> >> sounds like a prototype of (A) would be the tie-breaker
> >> between the two approaches.
> >>
> >> To be honest, I've been burned enough times that I tend to
> >> prototype my KIPs more often than not anyway.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Mon, 2022-02-07 at 11:42 +0000, Nick Telford wrote:
> >> > Hi everyone,
> >> >
> >> > Guozhang, the scope of my KIP is specifically about deploying
> structural
> >> > changes to existing applications, i.e. "upgrades". Sharing state
> between
> >> > different applications was not in the scope of my original proposal.
> >> >
> >> > John's email has it exactly right, and I think this points to my KIP
> not
> >> > explaining the problem correctly. Any suggestions on how I could
> better
> >> > clarify the intent of my proposal in the KIP?
> >> >
> >> > John, regarding your comments:
> >> >
> >> > A) being difficult to clean up state after migrations. Unless I've
> >> missed
> >> > something, this shouldn't be a problem. Tasks are already internally
> >> aware
> >> > of which stores they own from the Topology structure, irrespective of
> >> where
> >> > on-disk the StateStore data is, they should be able to find it. I
> think
> >> the
> >> > only real issue with this approach is that it will require changing,
> >> most
> >> > likely, quite a bit of code. We'll need to separate the concept of
> >> "state
> >> > directory" from "task directory", (which will still be needed to store
> >> Task
> >> > meta-data, like .lock files). At the very least, I think significant
> >> > changes may need to be made to StateDirectory and StateManager, but I
> >> > haven't investigated in detail. Perhaps it would make sense to first
> >> > explore this approach with a prototype to see how invasive it would
> >> become?
> >> >
> >> > B) My intent was always that this process would occur between
> >> > KafkaStreams.start() and threads actually starting, so that the
> >> migration
> >> > would occur safely. I'm not sure what kind of unexpected structural
> >> changes
> >> > could be detected by such a process; it might just be useful for
> general
> >> > validation. The main reasons I prefer (A) is that: 1) (B) requires
> >> > additional state meta-data, whereas (A) does not, which is an increase
> >> in
> >> > system complexity and; 2) I believe that (A) actually addresses a
> >> semantic
> >> > bug: specifically that StateStores are tightly coupled to Tasks, which
> >> is
> >> > unnecessary. Reducing this coupling would add no complexity, and
> >> > potentially simplify other processes in the future.
> >> >
> >> > Regards,
> >> >
> >> > Nick
> >> >
> >> > On Sat, 5 Feb 2022 at 17:19, John Roesler <vvcep...@apache.org>
> wrote:
> >> >
> >> > > Hello all,
> >> > >
> >> > > Thanks for the KIP, Nick!
> >> > >
> >> > > Based on this conversation, I think I might have misread the
> >> > > KIP, but it looks like Nick is just proposing a small fix to
> >> > > the existing compatability mechanism.
> >> > >
> >> > > Although we tell people to avoid changing topologies on the
> >> > > fly in general, we also tell them that, if they name all the
> >> > > persistent resources (stores and repartition nodes), then
> >> > > they can change the topologies without breaking anything
> >> > > (provided the change itself is logically sound).
> >> > >
> >> > > It seems like this KIP is just pointing out a flaw in that
> >> > > mechanism, that the (named) stores are kept inside the task
> >> > > directories, so if some change renumbers the tasks, Streams
> >> > > won't be able to find the local store files anymore. IIUC,
> >> > > the changelog topic will still be fine, so Streams would
> >> > > just allocate a new state directory in the new task name and
> >> > > restore the changelog into it.
> >> > >
> >> > > So, I think all this KIP is after is a way to preserve the
> >> > > local state files of a named store in the face of task
> >> > > renumbering. That's not to say that there's not some overlap
> >> > > with the NamedTopologies work, or that there's no value in
> >> > > being able to automatically reuse unnamed stores. But it
> >> > > probably makes sense to let Nick fix this one specific
> >> > > problem instead of coupling it to other large-scale
> >> > > engineering projects.
> >> > >
> >> > > Regarding the KIP itself:
> >> > >
> >> > > (A) is quite clean, but it does make it more challenging to
> >> > > clean up state when tasks migrate to other nodes. If that's
> >> > > the only problem, then I agree this is probably the best
> >> > > solution.
> >> > >
> >> > > (B) also makes a lot of sense to me, and I actually don't
> >> > > think it's a hack. It might also be useful for detecting
> >> > > when a topology has changed unexpectedly, for example. On
> >> > > the other hand, to safely move a state directory from one
> >> > > task directory to the other, we have to be sure no other
> >> > > thread is using either directory. To do that, we could
> >> > > either perform the operation in `KafkaStreams.start()`
> >> > > before any threads are started (we already know the topology
> >> > > at this point), or we can try to grab the directory locks on
> >> > > both tasks (but that sounds like a recipe for deadlock).
> >> > >
> >> > > In a nutshell, I'm supportive of this KIP, and I'd sugest we
> >> > > do a little more discovery on the implications of dropping
> >> > > the task level of the directory hierarchy before committing
> >> > > to A. And/or be a little more specific about how we can
> >> > > safely move state directories around before committing to B.
> >> > >
> >> > > Thanks again!
> >> > > -John
> >> > >
> >> > > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote:
> >> > > > Hi folks,
> >> > > >
> >> > > > I think the NamedTopology work would help with the convenience of
> >> the
> >> > > > solution for this KIP, but I feel it is not by itself the solution
> >> here.
> >> > > If
> >> > > > I'm not mistaken, the scope of this KIP is trying to tackle that,
> >> > > *assuming
> >> > > > the developer already knows* a new topology or part of the
> topology
> >> e.g.
> >> > > > like a state store of the topology does not change, then how to
> >> > > effectively
> >> > > > be able to reuse that part of the topology. Today it is very hard
> to
> >> > > reuse
> >> > > > part (say a state store, an internal topic) of a previous
> topology's
> >> > > > persistent state because:
> >> > > >
> >> > > > 1) the names of those persistent states are prefixed by the
> >> application
> >> > > id.
> >> > > > 2) the names of those persistent states are suffixed by the index,
> >> which
> >> > > > reflects the structure of the topology.
> >> > > > 3) the dir path of the persistent states are "prefixed" by the
> task
> >> id,
> >> > > > which is hence dependent on the sub-topology id.
> >> > > >
> >> > > > My quick thoughts are that 1) is easy to go around as long as
> users
> >> reuse
> >> > > > the same appId, 3) can be tackled with the help of the named
> >> topology but
> >> > > > each named topology can still be composed of multiple
> >> sub-topologies so
> >> > > > extra work is still needed to align the sub-topology ids, but we
> >> still
> >> > > need
> >> > > > something to tackle 2) here, which I was pondering between those
> >> options
> >> > > > and at the moment leaning towards option 2).
> >> > > >
> >> > > > Does that make sense to you?
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford <
> nick.telf...@gmail.com
> >> >
> >> > > wrote:
> >> > > >
> >> > > > > Hi Guozhang, Sophie,
> >> > > > >
> >> > > > > Thanks for both taking the time to review my proposal.
> >> > > > >
> >> > > > > I did actually see the NamedTopology classes, and noted that
> they
> >> were
> >> > > > > internal. I didn't realise they are part of an intended solution
> >> to
> >> > > this
> >> > > > > problem, that's very interesting. I'm going to try to find some
> >> time to
> >> > > > > take a look at your experimental work so I can understand it a
> bit
> >> > > better.
> >> > > > >
> >> > > > > From your description, it sounds like the NamedTopology approach
> >> should
> >> > > > > enable users to solve this problem at the level that they wish
> >> to. My
> >> > > > > concern is that users will need to be explicit about how their
> >> > > Topology is
> >> > > > > structured, and will need to know in advance how their
> Topologies
> >> might
> >> > > > > evolve in the future in order to correctly break them up by
> name.
> >> For
> >> > > > > example, if a user mistakenly assumes one particular structure
> for
> >> > > their
> >> > > > > application, but later makes changes that implicitly cause an
> >> existing
> >> > > > > NamedTopology to have its internal Subtopologies re-ordered, the
> >> user
> >> > > will
> >> > > > > need to clear all the local state for that NamedTopology, at
> >> least.
> >> > > > >
> >> > > > > Unless I'm mistaken, StateStores are defined exclusively by the
> >> data in
> >> > > > > their changelogs. Even if you make changes to a Topology that
> >> requires
> >> > > > > clearing locally materialized state, the changelogs aren't
> >> reset[1],
> >> > > so the
> >> > > > > newly rebuilt state is materialized from the pre-existing
> values.
> >> Even
> >> > > if
> >> > > > > changes are made to the Subtopology that writes to the
> >> StateStore, the
> >> > > > > existing data in the changelog hasn't changed. The contents of
> the
> >> > > > > StateStore evolves. This is exactly the same as a traditional
> >> database
> >> > > > > table, where a client may evolve its behaviour to subtly change
> >> the
> >> > > > > semantics of the data written to the table, without deleting the
> >> > > existing
> >> > > > > data.
> >> > > > >
> >> > > > > If a user makes a change that means a different Subtopology
> reads
> >> from
> >> > > the
> >> > > > > StateStore, the semantics of, and the data in the store itself
> >> hasn't
> >> > > > > actually changed. The only reason we need to reset this local
> >> state at
> >> > > all
> >> > > > > is due to the conflict on-disk caused by the change in
> Subtopology
> >> > > ordinal.
> >> > > > > If local StateStore data was decoupled from Tasks, this conflict
> >> would
> >> > > > > disappear, and the application would work as expected.
> >> > > > >
> >> > > > > A Subtopology is defined by all connected topics, including
> >> changelogs,
> >> > > > > repartition topics, source topics and sink topics. Whereas a
> >> > > StateStore is
> >> > > > > defined exclusively by its changelog. So why do we tightly
> couple
> >> > > > > StateStore to Subtopology? This is my central argument for
> option
> >> A
> >> > > that I
> >> > > > > outlined in the KIP, and I would like to discuss it further,
> even
> >> if
> >> > > only
> >> > > > > to educate myself on why it's not possible :-D
> >> > > > >
> >> > > > > I still think the NamedTopology work is valuable, but more as a
> >> means
> >> > > to
> >> > > > > better organize large applications.
> >> > > > >
> >> > > > > Regards,
> >> > > > > Nick
> >> > > > >
> >> > > > > 1: The only exception to this I can think of is when a user
> >> decides to
> >> > > > > change the format (Serdes) or semantics of the data in the
> store,
> >> in
> >> > > which
> >> > > > > case they would need to do a full reset by also clearing the
> >> changelog
> >> > > > > topic for that store. Realistically, users that wish to do this
> >> would
> >> > > be
> >> > > > > better off just creating a new store and deleting the old one,
> so
> >> I
> >> > > don't
> >> > > > > think it's a case worth optimizing for.
> >> > > > >
> >> > > > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman
> >> > > > > <sop...@confluent.io.invalid> wrote:
> >> > > > >
> >> > > > > > Hey Nick,
> >> > > > > >
> >> > > > > > thanks for the KIP, this is definitely a much-needed feature.
> >> I've
> >> > > > > actually
> >> > > > > > been working on
> >> > > > > > a somewhat similar feature for a while now and have a good
> >> chunk of
> >> > > the
> >> > > > > > implementation
> >> > > > > > completed -- but so far it's only exposed via internal APIs
> and
> >> > > hasn't
> >> > > > > been
> >> > > > > > brought to a KIP
> >> > > > > > yet, as it's a fairly large and complex project and I wanted
> to
> >> get
> >> > > all
> >> > > > > the
> >> > > > > > details hashed out
> >> > > > > > before settling on a public API.
> >> > > > > >
> >> > > > > > For some sense of how complicated it's been, you can check out
> >> the
> >> > > JIRA
> >> > > > > > ticket we've been
> >> > > > > > filing PRs under -- there are already 25 PRs to the feature.
> See
> >> > > > > > KAFKA-12648
> >> > > > > > <https://issues.apache.org/jira/browse/KAFKA-12648>. You can
> >> check
> >> > > > > > out the new KafkaStreamsNamedTopologyWrapper class to see what
> >> the
> >> > > > > current
> >> > > > > > API looks like
> >> > > > > > -- I recommend taking a look to see if this might cover some
> or
> >> all
> >> > > of
> >> > > > > the
> >> > > > > > things you wanted
> >> > > > > > this KIP to do.
> >> > > > > >
> >> > > > > > For a high-level sketch, my work introduces the concept of a
> >> > > > > > "NamedTopology" (which will be
> >> > > > > > renamed to "ModularTopology" in the future, but is still
> >> referred to
> >> > > as
> >> > > > > > "named" in the codebase
> >> > > > > > so I'll keep using it for now) . Each KafkaStreams app can
> >> execute
> >> > > > > multiple
> >> > > > > > named topologies,
> >> > > > > > which are just regular topologies that are given a unique
> name.
> >> The
> >> > > > > > essential feature of a
> >> > > > > > named topology is that it can be dynamically added or removed
> >> without
> >> > > > > even
> >> > > > > > stopping the
> >> > > > > > application, much less resetting it. Technically a
> >> NamedTopology can
> >> > > be
> >> > > > > > composed or one
> >> > > > > > or more subtopologies, but if you want to be able to update
> the
> >> > > > > application
> >> > > > > > at a subtopology
> >> > > > > > level you can just name each  subtopology.
> >> > > > > >
> >> > > > > > So I believe the feature you want is actually already
> >> implemented,
> >> > > for
> >> > > > > the
> >> > > > > > most part -- it's currently
> >> > > > > > missing a few things that I just didn't bother to implement
> yet
> >> since
> >> > > > > I've
> >> > > > > > been focused
> >> > > > > > on getting a working, minimal POC that I could use for
> testing.
> >> (For
> >> > > > > > example it doesn't yet
> >> > > > > > support global state stores) But beyond that, the only
> remaining
> >> > > work to
> >> > > > > > make this feature
> >> > > > > > available is to settle on the APIs, get a KIP passed, and
> >> implement
> >> > > said
> >> > > > > > APIs.
> >> > > > > >
> >> > > > > > Would you be interested in helping out with the NamedTopology
> >> work
> >> > > so we
> >> > > > > > can turn it into a
> >> > > > > > a full-fledged public feature? I'm happy to let you take the
> >> lead on
> >> > > the
> >> > > > > > KIP, maybe by adapting
> >> > > > > > this one if you think it makes sense to do so. The
> NamedTopology
> >> > > feature
> >> > > > > is
> >> > > > > > somewhat larger
> >> > > > > > in scope than strictly necessary for your purposes, however,
> so
> >> you
> >> > > could
> >> > > > > > take on just a part
> >> > > > > > of it and leave anything beyond that for me to do as followup.
> >> > > > > >
> >> > > > > > By the way: one advantage of the NamedTopology feature is that
> >> we
> >> > > don't
> >> > > > > > have to worry about
> >> > > > > > any compatibility issues or upgrade/migration path -- it's
> >> opt-in by
> >> > > > > > definition. (Of course we would
> >> > > > > > recommend using it to all users, like we do with named
> >> operators)
> >> > > > > >
> >> > > > > > Let me know what you think and how you want to proceed from
> >> here -- I
> >> > > > > > wouldn't want you to
> >> > > > > > spend time re-implementing more or less the same thing, but I
> >> most
> >> > > likely
> >> > > > > > wasn't going to find time
> >> > > > > > to put out a KIP for the NamedTopology feature in the near
> >> future.
> >> > > If you
> >> > > > > > would be able to help
> >> > > > > > drive this to completion, we'd each have significantly less
> >> work to
> >> > > do to
> >> > > > > > achieve our goals :)
> >> > > > > >
> >> > > > > > Cheers,
> >> > > > > > Sophie
> >> > > > > >
> >> > > > > >
> >> > > > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hello Nick,
> >> > > > > > >
> >> > > > > > > Thanks for bringing this up and for the proposed options. I
> >> read
> >> > > though
> >> > > > > > > your writeup and here are some of my thoughts:
> >> > > > > > >
> >> > > > > > > 1) When changing the topology of Kafka Streams, the
> developer
> >> need
> >> > > to
> >> > > > > > first
> >> > > > > > > decide if the whole topology's persisted state (including
> >> both the
> >> > > > > state
> >> > > > > > > store as well as its changelogs, and the repartition topics,
> >> and
> >> > > the
> >> > > > > > > source/sink external topics) or part of the persisted state
> >> can be
> >> > > > > > reused.
> >> > > > > > > This involves two types of changes:
> >> > > > > > >
> >> > > > > > > a) structural change of the topology, such like a new
> >> processor
> >> > > node is
> >> > > > > > > added/removed, a new intermediate topic is added/removed
> etc.
> >> > > > > > > b) semantic change of a processor, such as a numerical
> filter
> >> node
> >> > > > > > changing
> >> > > > > > > its filter threshold etc.
> >> > > > > > >
> >> > > > > > > Today both of them are more or less determined by developers
> >> > > manually.
> >> > > > > > > However, though automatically determining on changes of type
> >> b) is
> >> > > hard
> >> > > > > > if
> >> > > > > > > not possible, automatic determining on the type of a) is
> >> doable
> >> > > since
> >> > > > > > it's
> >> > > > > > > depend on just the information of:
> >> > > > > > > * number of sub-topologies, and their orders (i.e. sequence
> >> of ids)
> >> > > > > > > * used state stores and changelog topics within the
> >> sub-topology
> >> > > > > > > * used repartition topics
> >> > > > > > > * etc
> >> > > > > > >
> >> > > > > > > So let's assume in the long run we can indeed automatically
> >> > > determine
> >> > > > > if
> >> > > > > > a
> >> > > > > > > topology or part of it (a sub-topology) is structurally the
> >> same,
> >> > > what
> >> > > > > we
> >> > > > > > > can do is to "translate" the old persisted state names to
> the
> >> > > > > > > new, isomorphic topology's names. Following this thought I'm
> >> > > leaning
> >> > > > > > > towards the direction of option B in your proposal. But
> since
> >> in
> >> > > this
> >> > > > > KIP
> >> > > > > > > automatic determining structural changes are out of the
> >> scope, I
> >> > > feel
> >> > > > > we
> >> > > > > > > can consider adding some sort of a "migration tool" from an
> >> old
> >> > > > > topology
> >> > > > > > to
> >> > > > > > > new topology by renaming all the persisted states (store
> dirs
> >> and
> >> > > > > names,
> >> > > > > > > topic names).
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford <
> >> > > nick.telf...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi everyone,
> >> > > > > > > >
> >> > > > > > > > I'd like to start a discussion on Kafka Streams KIP-816 (
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
> >> > > > > > > > )
> >> > > > > > > >
> >> > > > > > > > This KIP outlines 3 possible solutions to the problem,
> and I
> >> > > plan to
> >> > > > > > > > whittle this down to a definitive solution based on this
> >> > > discussion.
> >> > > > > > > >
> >> > > > > > > > Of the 3 proposed solutions:
> >> > > > > > > > * 'A' is probably the "correct" solution, but is also
> quite
> >> a
> >> > > > > > significant
> >> > > > > > > > change.
> >> > > > > > > > * 'B' is the least invasive, but most "hacky" solution.
> >> > > > > > > > * 'C' requires a change to the wire protocol and will
> >> likely have
> >> > > > > > > > unintended consequences. C is also the least complete
> >> solution,
> >> > > and
> >> > > > > > will
> >> > > > > > > > need significant additional work to make it work.
> >> > > > > > > >
> >> > > > > > > > Please let me know if the Motivation and Background
> >> sections need
> >> > > > > more
> >> > > > > > > > clarity.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > >
> >> > > > > > > > Nick Telford
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > --
> >> > > > > > > -- Guozhang
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > >
> >> > >
> >>
> >>
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>

Reply via email to