Hi Sophie,

Thanks for the KIP. A very useful proposal!
Some questions:

1. the staticPartition method in the interface is commented out.

2. For error handling, as you can imagine, there could be errors happening
during partition expansion.That means, the operation would be (1) take long
time to complete, or (2) get stuck somewhere with fatal errorI'd like to
know how we handle these 2 situations? For (1) I'm thinking if we should
expose some metrics for monitoring, ex: state, topics to be autoscaled, ...
etc. For (2), I'm not sure if some partitions got expanded and some not
will cause any weird issues. If no, maybe just expose a metric for
autoscaling state, and have a state said "failed" something like that

3. Could this operation get aborted? I don't think so. Maybe there should
be a note in the KIP

Thank you.
Luke


On Tue, Nov 1, 2022 at 2:15 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Sophie,
>
> Thank you for the KIP!
>
> 1.
> I do not understand how autoscaling should work with a Streams topology
> with a stateful sub-topology that reads from the input topics. The
> simplest example is a topology that consists of only one stateful
> sub-topology. As far as I understand the upstream producer would route
> existing keys to different partitions after the partition expansion than
> before the expansion. That means Streams would -- in general -- not read
> the same keys with the same stream thread after the expansion. I think
> you proposed the solution to this in your last e-mail with the following:
>
> <quote>
> Essentially whoever is responsible for calculating how many partitions
> are needed should also be responsible for directing whichever new keys
> are supposed to go into those new partitions, then pass it along to the
> upstream producer to encode in the record itself.
> </quote>
>
> But I am not 100% sure if you really meant what I understand. If I
> understand it correctly, you propose that the user is responsible to
> produce the records with existing keys to the same partitions as before
> the expansion upstream. I think that is an important information that
> should be pointed out in the KIP.
>
>
> 2.
> I would log an error and shutdown the Streams application if a custom
> partitioner is used anywhere in the topology. I think that would make
> the limitations clearer and would reduce perceived unexpected behavior
> by the users. Are there any specific reasons you propose to ignore it
> and log a warning?
>
> Best,
> Bruno
>
> On 28.10.22 04:51, Sophie Blee-Goldman wrote:
> > Thanks all! I'll try to address everything but don't hesitate to call me
> > out if anything is missed
> >
> > Colt/Lucas:
> >
> > Thanks for clarifying, I think I understand your example now. Something I
> > didn't think to mention
> > earlier but hopefully clears up how this would be used in practice is
> that
> > the partitioning decision/
> > logic doesn't need to -- and perhaps explicitly should not be -- internal
> > to the StaticStreamPartitioner
> > interface alone. I would imagine a realistic scenario would have the
> > partition essentially determined
> > upstream of the actual application, specifically integrated with whatever
> > system (or person) is
> > making the decision to add new partition(s) in the first place. Then the
> > partitioner is just reading out
> > some field in the record key/value, possibly doing some translation to
> > derive the final partition number
> > from something like a userId if it's not encoded directly, and not
> actually
> > computing anything itself.
> > Does that make sense? Essentially whoever is responsible for calculating
> > how many partitions are
> > needed should also be responsible for directing whichever new keys are
> > supposed to go into those
> > new partitions, then pass it along to the upstream producer to encode in
> > the record itself.
> >
> > In sum, I second what Lucas said about your scenario actually being a
> good
> > example of one way
> > to approach implementing static partitioning, ie based on time. It's just
> > that the semantics/logic to
> > interpret the target partition based on time would be external to the
> > application and not isolated in
> > the actual StaticStreamPartitioner class. Imo this makes perfect sense,
> as
> > something like IQ is
> > also going to be situated outside of the Streams application itself, so
> > presumably it can talk to
> > the system that is responsible for the partitioning logic for any
> partition
> > information it needs.
> >
> > Bill/Sagar:
> >
> > I've been going back and forth a lot on whether to open this feature up
> to
> > stateless applications or
> > even stateful ones as well, but feel like I've settled on having it
> > targeted towards both (but only) the
> > stateless and statically partitioned cases. Bill, my only concern about
> the
> > stateless apps was the
> > possibility for trouble when repartitioning a stateless application that
> > feeds into a stateful application
> > downstream. But now that I think about it, users would actually need to
> > ensure that any/all apps
> > in that pipeline could handle partition increases, so it would be
> > impossible for someone to mess up
> > something downstream with corrupted partitioning because any changes to
> the
> > output topics would
> > of course mean changing the input topics of those downstream apps, and
> they
> > would just shut down
> > if not set up to handle this -- that's the whole point of this KIP. So
> > I'm +1 on including the stateless folks
> >
> > As for stateful applications, I feel pretty strongly that we should
> > discourage users from trying to use
> > the autoscaling feature when state is involved. However, as I touch on
> > again briefly in the API discussion
> > below, there's no way to truly prevent someone from abusing this feature
> if
> > they are determined to. So
> > the idea is really for us to stress and heavily document which kinds of
> > applications can and cannot
> > enable autoscaling and/or be repartitioned without resulting in
> significant
> > corruption of the results.
> >
> > As for key skew, technically anything is possible -- but (a) we're
> > entrusting users to make smart choices
> > throughout this KIP, which includes being careful with the partitioning
> > logic, (b) the real-world use cases
> > I'm aware of that requested this feature were not even susceptible to
> skew
> > from repartitioning as their
> > architecture involved giving each key its own partition, and (c) if key
> > skew is going to become a problem,
> > I would consider that a question for the KIP that introduced partition
> > increases, not an issue with a KIP
> > that's just trying to make Streams compatible with this ability :)
> > But yes, it's always a possibility and nonetheless fair to be concerned.
> > It's worth calling out in the docs
> > somewhere and trying to help users avoid problems with this.
> >
> > Walker:
> >
> > Thanks, yes you are right that there will not be a default implementation
> > provided, and also right that
> > this should have been explicitly called out in the KIP. I've added a note
> > to address this.
> >
> > That said, since we're expanding the feature to include/allow stateless
> > applications as well, I've
> > been mulling over a few possible alternatives or modifications to the
> > currently proposed APIs.
> >
> > 1. We could expand the scope of the new config to enable setting a
> default
> > partitioner across the application regardless of the static condition and
> > autoscaling feature. But
> > if the user passes in a custom partitioner that does implement the new
> > StaticStreamPartitioner
> > interface, then autoscaling will be enabled. Some further options within
> > this scenario:
> >    a. Would we still lock down the partitioning and prevent the static
> > partitioner from being overridden?
> >        My personal preference is "yes", though it is a bit awkward to
> have
> > different semantics depending
> >        on what kind of partitioner is passed in. Therefore I'd propose to
> > always enforce any partitioner
> >        that's passed in as the default, and not allow topology-level
> > overrides. Imo this would also make
> >        the new config safer from user error due to accidental
> > discrepancies throughout the topology
> >    b. How should we expose the feature for stateless apps? We could just
> > offer an OOTB implementation
> >        for stateless apps, which could implement the StreamPartitioner
> > interface directly to circumvent the
> >        awkwardness of implementing an interface whose condition
> (staticness)
> > it doesn't meet. The downside
> >        is that some stateless apps may still want customized partitioning
> > logic. Of course they can just extend
> >        the class, but again it just feels slightly awkward due to the
> > interface/class asymmetry. Alternatively, the
> >        StatelessStreamPartitioner could be an interface in parallel to
> the
> > StaticStreamPartitioner. However, I
> >        anticipate that the vast majority of stateless apps which may want
> > this feature do not use a custom
> >        partitioner, and would be annoyed at having to implement one just
> to
> > unlock autoscaling. So if we did
> >        go this route, we'd probably need a default implementation
> anyways.
> >        That last option would probably be the best user experience, even
> if
> > slightly more work for us/me to
> >        add.
> > 2. Another option is to keep the config semantics the same but change the
> > name to something like
> > 'autoscaling.partitioner.class'. Then we can do something similar to
> what's
> > discussed in 1b, with my
> > preference being to accept either a StaticStreamPartitioner OR
> > implementation of a StatelessStreamPartitioner
> > interface, for which an OOTB default partitioner would also be provided.
> > 3. One last open question here is whether we should try enforcing the
> > statelessness of applications that try
> > to enable autoscaling via whatever API we land on for the stateless case.
> > Personally I'm in favor of this, and
> > users who really want to get around our roadblocks and muck up a stateful
> > app could still get through via
> > the static partitioner. This check would just be an additional guardrail
> > from accidental misuses, not intentional ones
> >
> > What do you all think? Any strong preferences or concerns about any of
> > these API options? Should we expand
> > the config to be useful for any app with custom partitioning, or keep it
> > focused on the autoscaling feature? I do
> > worry a bit that when some users see a new config about enabling
> > autoscaling, they'll get excited and blindly plug
> > in the OOTB assignor to try it out without really understanding its
> > limitations and intended use. Maybe that's just
> > paranoid, I certainly hope so. Anyways I look forward to hearing all your
> > opinions on the public interface here.
> >
> > Whew, that was a long one, but thanks again to everyone who's joined the
> > discussion so far! You've really helped
> > me to clarify my thoughts and vision for this feature. Looking forward to
> > your replies
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Oct 25, 2022 at 1:45 PM Walker Carlson
> > <wcarl...@confluent.io.invalid> wrote:
> >
> >> Hey Sophie,
> >>
> >> Thanks for the KIP. I think this could be useful for a lot of cases. I
> also
> >> think that this could cause a lot of confusion.
> >>
> >> Just to make sure we are doing our best to prevent people from
> >> misusing this feature, I wanted to clarify a couple of things.
> >> 1) There will be only an interface and no "default" implementation that
> a
> >> user can plug in for the static partitioner. I am considering when it
> comes
> >> to testing we want to make sure that we do not make our testing
> >> implementation avaible to a user.
> >> 2)  If a user wanted to use auto scaling for a stateless application it
> >> should be as easy as implementing the StaticStreamsPartitioner. Their
> >> implementation could even just wrap the default partitioner if they
> wanted,
> >> right?  I can't think of any way we could detect and then warn them
> about
> >> the output topic not being partitioned by keys if that were to happen,
> can
> >> you?
> >>
> >> Overall this looks good to me!
> >>
> >> Walker
> >>
> >> On Tue, Oct 25, 2022 at 12:27 PM Bill Bejeck <bbej...@gmail.com> wrote:
> >>
> >>> Hi Sophie,
> >>>
> >>> Thanks for the KIP! I think this is a worthwhile feature to add.  I
> have
> >>> two main questions about how this new feature will work.
> >>>
> >>>
> >>>     1. You mention that for stateless applications auto-scaling is a
> >> sticker
> >>>     situation.  But I was thinking that the auto-scaling would actually
> >>> benefit
> >>>     stateless applications the most, let me explain my thinking.  Let's
> >> say
> >>> you
> >>>     have a stateless Kafka Streams application with one input topic
> and 2
> >>>     partitions, meaning you're limited to at most 2 stream threads.  In
> >>> order
> >>>     to increase the throughput, you increase the number of partitions
> of
> >> the
> >>>     source topic to 4, so you can 4 stream threads.  In this case would
> >> the
> >>>     auto-scaling feature automatically increase the number of tasks
> from 2
> >>> to
> >>>     4?  Since the application is stateless, say using a filter then a
> map
> >>> for
> >>>     example, the partition for the record doesn't matter, so it seems
> that
> >>>     stateless applications would stand to gain a great deal.
> >>>     2. For stateful applications I can see the immediate benefit from
> >>>     autoscaling and static partitioning.   But again going with a
> >> partition
> >>>     expansion for increased throughput example, what would be the
> >> mitigation
> >>>     strategy for a stateful application that eventually wants to take
> >>> advantage
> >>>     of the increased number of partitions? Otherwise keeping all keys
> on
> >>> their
> >>>     original partition means you could end up with "key skew" due to
> not
> >>>     allowing keys to distribute out to the new partitions.
> >>>
> >>> One last comment, the KIP states "only the key, rather than the key and
> >>> value, are passed in to the partitioner", but the interface has it
> >> taking a
> >>> key and a value as parameters.  Based on your comments earlier in this
> >>> thread I was thinking that the text needs to be updated.
> >>>
> >>> Thanks,
> >>> Bill
> >>>
> >>> On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy
> >>> <lbruts...@confluent.io.invalid> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> thanks, Sophie, this makes sense. I suppose then the way to help the
> >> user
> >>>> not apply this in the wrong setting is having good documentation and a
> >>> one
> >>>> or two examples of good use cases.
> >>>>
> >>>> I think Colt's time-based partitioning is a good example of how to use
> >>>> this. It actually doesn't have to be time, the same will work with any
> >>>> monotonically increasing identifier. I.e. the new partitions will only
> >>> get
> >>>> records for users with a "large" user ID greater than some user ID
> >>>> threshold hardcoded in the static partitioner. At least in this
> >>> restricted
> >>>> use-case, lookups by user ID would still be possible.
> >>>>
> >>>> Cheers,
> >>>> Lucas
> >>>>
> >>>> On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy <c...@littlehorse.io>
> >>> wrote:
> >>>>
> >>>>> Sophie,
> >>>>>
> >>>>> Regarding item "3" (my last paragraph from the previous email),
> >>> perhaps I
> >>>>> should give a more general example now that I've had more time to
> >>> clarify
> >>>>> my thoughts:
> >>>>>
> >>>>> In some stateful applications, certain keys have to be findable
> >> without
> >>>> any
> >>>>> information about when the relevant data was created. For example, if
> >>> I'm
> >>>>> running a word-count app and I want to use Interactive Queries to
> >> find
> >>>> the
> >>>>> count for "foo", I would need to know whether "foo" first arrived
> >>> before
> >>>> or
> >>>>> after time T before I could find the correct partition to look up the
> >>>> data.
> >>>>> In this case, I don't think static partitioning is possible. Is this
> >>>>> use-case a non-goal of the KIP, or am I missing something?
> >>>>>
> >>>>> Colt McNealy
> >>>>> *Founder, LittleHorse.io*
> >>>>>
> >>>>>
> >>>>> On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
> >>>>> <sop...@confluent.io.invalid> wrote:
> >>>>>
> >>>>>> Thanks for the responses guys! I'll get the easy stuff out of the
> >> way
> >>>>>> first:
> >>>>>>
> >>>>>> 1) Fixed the KIP so that StaticStreamPartitioner extends
> >>>>> StreamPartitioner
> >>>>>> 2) I totally agree with you Colt, the record value might have
> >>> valuable
> >>>>> (no
> >>>>>> pun) information
> >>>>>> in it that is needed to compute the partition without breaking the
> >>>> static
> >>>>>> constraint. As in my
> >>>>>> own example earlier, maybe the userId is a field in the value and
> >> not
> >>>> the
> >>>>>> key itself. Actually
> >>>>>> it was that exact thought that made me do a U-turn on this but I
> >>> forgot
> >>>>> to
> >>>>>> update the thread
> >>>>>> 3) Colt, I'm not  sure I follow what you're trying to say in that
> >>> last
> >>>>>> paragraph, can you expand?
> >>>>>> 4) Lucas, it's a good question as to what kind of guard-rails we
> >>> could
> >>>>> put
> >>>>>> up to enforce or even
> >>>>>> detect a violation of static partitioning. Most likely Streams
> >> would
> >>>> need
> >>>>>> to track every key to
> >>>>>> partition mapping in an internal state store, but we have no
> >>> guarantee
> >>>>> the
> >>>>>> key space is bounded
> >>>>>> and the store wouldn't grow out of control. Mostly however I
> >> imagine
> >>>>> users
> >>>>>> would be frustrated
> >>>>>> to find out there's a secret, extra state store taking up space
> >> when
> >>>> you
> >>>>>> enable autoscaling, and
> >>>>>> it's not even to provide functionality but just to make sure users
> >>>> aren't
> >>>>>> doing something wrong.
> >>>>>>
> >>>>>> I wish I had a better idea, but sadly I think the only practical
> >>>> solution
> >>>>>> here is to try and make this
> >>>>>> condition as clear and obvious and easy to understand as possible,
> >>>>> perhaps
> >>>>>> by providing an
> >>>>>> example of what does and does not satisfy the constraint in the
> >>>> javadocs.
> >>>>>> I'll work on that
> >>>>>> 5) I covered a bit above the impracticality of storing a
> >> potentially
> >>>>>> unbounded keyspace, which
> >>>>>> as you mention would need to be shared by all partitioners as well,
> >>> so
> >>>> I
> >>>>>> would agree that this
> >>>>>> feels insurmountable. I'm leaning towards only enabling this
> >> feature
> >>>> for
> >>>>>> the static partitioning
> >>>>>> case at least in the first iteration, and we can see how things go
> >>> from
> >>>>>> there -- for example, are
> >>>>>> people generally able to implement it correctly? If we find that
> >> the
> >>>>>> feature is working well and
> >>>>>> users are hungry for more, then it would be relatively
> >>> straightforward
> >>>> to
> >>>>>> open things up to
> >>>>>> stateless applications, or even stateful applications which can
> >>>> withstand
> >>>>>> some "blips" in the
> >>>>>> logic/correctness.
> >>>>>>
> >>>>>> That said, *technically* the feature would be able to be turned on
> >>> for
> >>>>> any
> >>>>>> such case as it is, since
> >>>>>> as discussed above it's difficult to place true guardrails around
> >> the
> >>>>>> feature that can enforce
> >>>>>> static partitioning. Perhaps we could put a short note in the
> >>>>>> StaticStreamPartitioner docs that
> >>>>>> explain how and when it's safe to break the static requirement, but
> >>>> that
> >>>>> we
> >>>>>> recommend against
> >>>>>> doing so..
> >>>>>>
> >>>>>> Thoughts?
> >>>>>>
> >>>>>> -Sophie
> >>>>>>
> >>>>>> On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Sophie,
> >>>>>>>
> >>>>>>> Thank you for your detailed response. That makes sense (one
> >>> partition
> >>>>> per
> >>>>>>> user seems like a lot of extra metadata if you've got millions of
> >>>>> users,
> >>>>>>> but I'm guessing that was just for illustrative purposes).
> >>>>>>>
> >>>>>>> In this case I'd like to question one small detail in your kip.
> >> The
> >>>>>>> StaticPartitioner takes in just the key and not the value...in an
> >>>>>>> application I've been working on, the "value" is a long-lived
> >>> entity
> >>>>>>> (spanning hundreds of records over several days) that has
> >> timestamp
> >>>>>>> information about the creation of the entity inside of it. The ID
> >>>>> itself
> >>>>>> is
> >>>>>>> provided by the end-user of the system and as such isn't
> >> guaranteed
> >>>> to
> >>>>>> have
> >>>>>>> timestamp info.
> >>>>>>>
> >>>>>>> This is quite a corner case, but if the StaticStreamPartitioner
> >>>>> interface
> >>>>>>> were allowed to peak at the record value, it would be trivial to
> >>>>>> implement
> >>>>>>> logic as follows:
> >>>>>>> ```
> >>>>>>> entity = deserialize(record.value())
> >>>>>>>
> >>>>>>> if entity.created_before(T):
> >>>>>>>    return hash(key) % old_partitions
> >>>>>>> else:
> >>>>>>>    return hash(key) % new_partitions
> >>>>>>> ```
> >>>>>>>
> >>>>>>> That said, you're a rockstar architect and have seen a lot more
> >>>> system
> >>>>>>> design than I have (I'm 23 and only 3 years out of school...you
> >>>>>> implemented
> >>>>>>> cooperative rebalancing 😀). So don't make that decision unless
> >> you
> >>>> can
> >>>>>> see
> >>>>>>> other use-cases where it is appropriate.
> >>>>>>>
> >>>>>>> Additionally, for my own use-case I'm not sure if static
> >>> partitioning
> >>>>>> alone
> >>>>>>> (as opposed to re-partitioning and re-playing the changelogs into
> >>> new
> >>>>>>> stores) would enable auto-scaleout because my system uses Kafka
> >>>> Streams
> >>>>>> as
> >>>>>>> the data store *and* a secondary index...for example, when a user
> >>>> wants
> >>>>>> to
> >>>>>>> look up all entities where the variable `user_email==f...@bar.com
> >> `,
> >>>> we
> >>>>>> have
> >>>>>>> an index store that has keys partitioned by and prefixed with
> >>>>>> `user_email==
> >>>>>>> f...@bar.com`. Entities with that email (for example) could come
> >>>> before
> >>>>>> or
> >>>>>>> after time T.
> >>>>>>>
> >>>>>>> Anyways, that's just my twopence, if I were a voting committer
> >> I'd
> >>>> vote
> >>>>>> for
> >>>>>>> this KIP as-is.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Colt McNealy
> >>>>>>> *Founder, LittleHorse.io*
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
> >>>>>>> <sop...@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>>> Thanks for your questions, I would say that your understanding
> >>>> sounds
> >>>>>>>> correct based
> >>>>>>>> on what you described but I'll try to add some clarity. The
> >> basic
> >>>>> idea
> >>>>>> is
> >>>>>>>> that, as you said,
> >>>>>>>> any keys that are processed before time T will go to partition
> >> 1.
> >>>> All
> >>>>>> of
> >>>>>>>> those keys should
> >>>>>>>> then continue to be routed to partition 1 for the remainder of
> >>> the
> >>>>>> app's
> >>>>>>>> lifetime, if you care
> >>>>>>>> about maintaining correct history/"state" for that key (I'll
> >> come
> >>>>> back
> >>>>>> to
> >>>>>>>> this in the next
> >>>>>>>> paragraph). After the time T, new keys that weren't processed
> >>> prior
> >>>>> to
> >>>>>> T
> >>>>>>>> may be routed to
> >>>>>>>> either partition, provided they are similarly mapped to the
> >> same
> >>>>>>> partition
> >>>>>>>> forever after. It's
> >>>>>>>> up to the user to enforce this, perhaps by trying to keep track
> >>> of
> >>>>> all
> >>>>>>> keys
> >>>>>>>> but that is likely to
> >>>>>>>> be impractical. This feature is generally more targeted at
> >> cases
> >>>>> where
> >>>>>>> the
> >>>>>>>> partition mapping
> >>>>>>>> is "obvious" enough to compute without needing to maintain a
> >>>> history
> >>>>> of
> >>>>>>> all
> >>>>>>>> keys and their
> >>>>>>>> original partition: for example, imagine an application that
> >>>>> processes
> >>>>>>> user
> >>>>>>>> account information.
> >>>>>>>> You can scale out to a partition per user, and add a new
> >>> partition
> >>>>> each
> >>>>>>>> time someone opens
> >>>>>>>> a new account. When they open that account they get a userID
> >>>> number,
> >>>>>>>> starting with #0 and
> >>>>>>>> counting up from there. In that case, the partition for any
> >>> records
> >>>>>>>> pertaining to a given account
> >>>>>>>> would just be its userID.
> >>>>>>>>
> >>>>>>>> I hope that clears up the kind of intended use case we're
> >>> targeting
> >>>>>> with
> >>>>>>>> this feature. That said,
> >>>>>>>> another important and equally viable use case that I neglected
> >> to
> >>>>>> mention
> >>>>>>>> in the KIP is fully
> >>>>>>>> stateless applications. Technically this feature can produce
> >>>> correct
> >>>>>>>> results for applications that
> >>>>>>>> are at least one of (a) statically partitioned, or (b)
> >> completely
> >>>>>>>> stateless. However, the stateless
> >>>>>>>> case is a bit stickier since even if the Streams application
> >>> itself
> >>>>>>> doesn't
> >>>>>>>> care about maintaining
> >>>>>>>> the same mapping of key to partition, it could for example be
> >>>> feeding
> >>>>>>> into
> >>>>>>>> a downstream
> >>>>>>>> application which *does* need to maintain state, and which
> >> would
> >>>> wind
> >>>>>> up
> >>>>>>>> "losing" the history for
> >>>>>>>> any keys that changed partition.
> >>>>>>>>
> >>>>>>>> I kind of felt like opening this feature up to stateless
> >>>> applications
> >>>>>>> would
> >>>>>>>> be asking for trouble and
> >>>>>>>> make it too easy for people to shoot themselves in the foot.
> >> That
> >>>>> said,
> >>>>>>> I'm
> >>>>>>>> open to discussion on
> >>>>>>>> this point if you feel like the benefits here outweigh the
> >> risks.
> >>>> I'm
> >>>>>>> also
> >>>>>>>> happy to consider modifying
> >>>>>>>> the API so that it could naturally be expanded to include
> >>> stateless
> >>>>>>>> applications  in the future, even
> >>>>>>>> if we decide against allowing that use case in the first
> >>> iteration
> >>>> of
> >>>>>> the
> >>>>>>>> feature.
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>> Sophie
> >>>>>>>>
> >>>>>>>> On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <
> >>> c...@littlehorse.io>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sophie,
> >>>>>>>>>
> >>>>>>>>> Thank you for the KIP! Choosing the number of partitions in a
> >>>>> Streams
> >>>>>>> app
> >>>>>>>>> is a tricky task because of how difficult it is to
> >>> re-partition;
> >>>>> I'm
> >>>>>>> glad
> >>>>>>>>> you're working on an improvement. I've got two questions:
> >>>>>>>>>
> >>>>>>>>> First, `StaticStreamsPartitioner` is an interface that we
> >>>> (Streams
> >>>>>>> users)
> >>>>>>>>> must implement, I'm trying to understand how it would work.
> >> For
> >>>>>>> example,
> >>>>>>>>> let's say there's some point in time 'T' before which we
> >> have 1
> >>>>>>>> partition.
> >>>>>>>>> Then we decide to increase the partition count to 2 at time
> >> T.
> >>>> From
> >>>>>> my
> >>>>>>>>> understanding, all keys that had passed through the Streams
> >> app
> >>>>>> before
> >>>>>>>> time
> >>>>>>>>> T must end up on partition 1 if they appear again in the
> >> input
> >>>>>> topics;
> >>>>>>>> but
> >>>>>>>>> any new keys are allowed to be sent to partition 2. Is that
> >>>>> correct?
> >>>>>>> And
> >>>>>>>>> (pardon the naive question) how is this achieved without
> >>> keeping
> >>>>>> track
> >>>>>>> of
> >>>>>>>>> all keys that have been seen at any point?
> >>>>>>>>>
> >>>>>>>>> Secondly, will this feature work with applications that use
> >>>>>> interactive
> >>>>>>>>> queries?
> >>>>>>>>>
> >>>>>>>>> Thank you very much,
> >>>>>>>>> Colt McNealy
> >>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
> >>>>>>>>> <sop...@confluent.io.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hey all,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to propose a new autoscaling feature for Kafka
> >>> Streams
> >>>>>>>>>> applications which can follow the constraint of static
> >>>>>> partitioning.
> >>>>>>>> For
> >>>>>>>>>> further details please refer to the KIP document:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
> >>>>>>>>>>
> >>>>>>>>>> This feature will be targeted for 3.4 but may not be fully
> >>>>>>> implemented
> >>>>>>>>>> until the following release, 3.5.
> >>>>>>>>>>
> >>>>>>>>>> Please give this a read and let me know what you think!
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Sophie
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to